You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2019/03/04 07:28:46 UTC
[mnemonic] branch master updated: MNEMONIC-519: Add test cases for
mapred based older Hadoop API
This is an automated email from the ASF dual-hosted git repository.
garyw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mnemonic.git
The following commit(s) were added to refs/heads/master by this push:
new 80180a8 MNEMONIC-519: Add test cases for mapred based older Hadoop API
80180a8 is described below
commit 80180a8240c370f292a464fba5a359129dedb5c6
Author: Kevin Ratnasekera <dj...@yahoo.com>
AuthorDate: Mon Feb 25 20:23:49 2019 +0530
MNEMONIC-519: Add test cases for mapred based older Hadoop API
---
.../mnemonic/{mapreduce => common}/Person.java | 2 +-
.../{mapreduce => common}/PersonListEFProxy.java | 2 +-
.../mnemonic/mapred/MneMapredChunkDataTest.java | 255 +++++++++++++++++++++
.../MneMapredLongDataTest.java} | 124 +++++-----
.../MneMapredPersonDataTest.java} | 85 +++----
.../mapreduce/MneMapreducePersonDataTest.java | 2 +
6 files changed, 368 insertions(+), 102 deletions(-)
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/Person.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/Person.java
similarity index 98%
rename from mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/Person.java
rename to mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/Person.java
index f61dd80..f753318 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/Person.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/Person.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.mnemonic.mapreduce;
+package org.apache.mnemonic.common;
import org.apache.mnemonic.Durable;
import org.apache.mnemonic.EntityFactoryProxy;
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/PersonListEFProxy.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/PersonListEFProxy.java
similarity index 98%
rename from mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/PersonListEFProxy.java
rename to mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/PersonListEFProxy.java
index 734fa0e..f16dbb0 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/PersonListEFProxy.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/PersonListEFProxy.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.mnemonic.mapreduce;
+package org.apache.mnemonic.common;
import org.apache.mnemonic.DurableType;
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredChunkDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredChunkDataTest.java
new file mode 100644
index 0000000..521eca7
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredChunkDataTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.mapred;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.mnemonic.DurableChunk;
+import org.apache.mnemonic.DurableType;
+import org.apache.mnemonic.Utils;
+import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
+import org.apache.mnemonic.hadoop.MneDurableInputSession;
+import org.apache.mnemonic.hadoop.MneDurableOutputSession;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
+import org.apache.mnemonic.hadoop.mapred.MneInputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneOutputFormat;
+import org.apache.mnemonic.sessions.SessionIterator;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+@SuppressWarnings("restriction")
+public class MneMapredChunkDataTest {
+
+ private static final String DEFAULT_BASE_WORK_DIR = "target" + File.separator + "test" + File.separator + "tmp";
+ private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator + "chunk-data";
+ private static final String SERVICE_NAME = "pmalloc";
+ private static final long SLOT_KEY_ID = 5L;
+ private Path m_workdir;
+ private JobConf m_conf;
+ private FileSystem m_fs;
+ private Random m_rand;
+ private TaskAttemptID m_taid;
+ private TaskAttemptContext m_tacontext;
+ private long m_reccnt = 5000L;
+ private volatile long m_checksum;
+ private volatile long m_totalsize = 0L;
+ private Unsafe unsafe;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ m_workdir = new Path(
+ System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
+ m_conf = new JobConf();
+ m_rand = Utils.createRandom();
+ unsafe = Utils.getUnsafe();
+
+ try {
+ m_fs = FileSystem.getLocal(m_conf).getRaw();
+ m_fs.delete(m_workdir, true);
+ m_fs.mkdirs(m_workdir);
+ } catch (IOException e) {
+ throw new IllegalStateException("bad fs init", e);
+ }
+
+ m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+ m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
+
+ MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
+ MneConfigHelper.setBaseOutputName(m_conf, null, "chunk-data");
+
+ MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
+ MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+ MneConfigHelper.setDurableTypes(m_conf,
+ MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.CHUNK});
+ MneConfigHelper.setEntityFactoryProxies(m_conf,
+ MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{});
+ MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
+ MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+ MneConfigHelper.setMemPoolSize(m_conf,
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
+ MneConfigHelper.setDurableTypes(m_conf,
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.CHUNK});
+ MneConfigHelper.setEntityFactoryProxies(m_conf,
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{});
+ }
+
+ @AfterClass
+ public void tearDown() {
+
+ }
+
+ protected DurableChunk<?> genupdDurableChunk(
+ MneDurableOutputSession<DurableChunk<?>> s, Checksum cs) {
+ DurableChunk<?> ret = null;
+ int sz = m_rand.nextInt(1024 * 1024) + 1024 * 1024;
+ ret = s.newDurableObjectRecord(sz);
+ byte b;
+ if (null != ret) {
+ for (int i = 0; i < ret.getSize(); ++i) {
+ b = (byte) m_rand.nextInt(255);
+ unsafe.putByte(ret.get() + i, b);
+ cs.update(b);
+ }
+ m_totalsize += sz;
+ }
+ return ret;
+ }
+
+ @Test(enabled = true)
+ public void testWriteChunkData() throws Exception {
+ NullWritable nada = NullWritable.get();
+ MneDurableOutputSession<DurableChunk<?>> sess =
+ new MneDurableOutputSession<DurableChunk<?>>(m_tacontext, null,
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+ MneDurableOutputValue<DurableChunk<?>> mdvalue =
+ new MneDurableOutputValue<DurableChunk<?>>(sess);
+ OutputFormat<NullWritable, MneDurableOutputValue<DurableChunk<?>>> outputFormat =
+ new MneOutputFormat<MneDurableOutputValue<DurableChunk<?>>>();
+ RecordWriter<NullWritable, MneDurableOutputValue<DurableChunk<?>>> writer =
+ outputFormat.getRecordWriter(null, m_conf, null, null);
+ DurableChunk<?> dchunk = null;
+ Checksum cs = new CRC32();
+ cs.reset();
+ for (int i = 0; i < m_reccnt; ++i) {
+ dchunk = genupdDurableChunk(sess, cs);
+ Assert.assertNotNull(dchunk);
+ writer.write(nada, mdvalue.of(dchunk));
+ }
+ m_checksum = cs.getValue();
+ writer.close(null);
+ sess.close();
+ }
+
+ @Test(enabled = true, dependsOnMethods = {"testWriteChunkData"})
+ public void testReadChunkData() throws Exception {
+ List<String> partfns = new ArrayList<String>();
+ long reccnt = 0L;
+ long tsize = 0L;
+ Checksum cs = new CRC32();
+ cs.reset();
+ File folder = new File(m_workdir.toString());
+ File[] listfiles = folder.listFiles();
+ for (int idx = 0; idx < listfiles.length; ++idx) {
+ if (listfiles[idx].isFile()
+ && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+ && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+ partfns.add(listfiles[idx].getName());
+ }
+ }
+ Collections.sort(partfns); // keep the order for checksum
+ for (int idx = 0; idx < partfns.size(); ++idx) {
+ System.out.println(String.format("Verifying : %s", partfns.get(idx)));
+ FileSplit split = new FileSplit(
+ new Path(m_workdir, partfns.get(idx)), 0, 0L, new String[0]);
+ InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>> inputFormat =
+ new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
+ RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>> reader =
+ inputFormat.getRecordReader(split, m_conf, null);
+ MneDurableInputValue<DurableChunk<?>> dchkval = null;
+ NullWritable dchkkey = reader.createKey();
+ while (true) {
+ dchkval = reader.createValue();
+ if (reader.next(dchkkey, dchkval)) {
+ byte b;
+ for (int j = 0; j < dchkval.getValue().getSize(); ++j) {
+ b = unsafe.getByte(dchkval.getValue().get() + j);
+ cs.update(b);
+ }
+ tsize += dchkval.getValue().getSize();
+ ++reccnt;
+ } else {
+ break;
+ }
+ }
+ reader.close();
+ }
+ AssertJUnit.assertEquals(m_reccnt, reccnt);
+ AssertJUnit.assertEquals(m_totalsize, tsize);
+ AssertJUnit.assertEquals(m_checksum, cs.getValue());
+ System.out.println(String.format("The checksum of chunk is %d", m_checksum));
+ }
+
+ @Test(enabled = true, dependsOnMethods = {"testWriteChunkData"})
+ public void testBatchReadChunkDataUsingInputSession() throws Exception {
+ List<String> partfns = new ArrayList<String>();
+ long reccnt = 0L;
+ long tsize = 0L;
+ Checksum cs = new CRC32();
+ cs.reset();
+ File folder = new File(m_workdir.toString());
+ File[] listfiles = folder.listFiles();
+ for (int idx = 0; idx < listfiles.length; ++idx) {
+ if (listfiles[idx].isFile()
+ && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+ && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+ partfns.add(listfiles[idx].getName());
+ }
+ }
+ Collections.sort(partfns); // keep the order for checksum
+ List<Path> paths = new ArrayList<Path>();
+ for (String fns : partfns) {
+ paths.add(new Path(m_workdir, fns));
+ System.out.println(String.format("[Batch Mode] Added : %s", fns));
+ }
+ MneDurableInputSession<DurableChunk<?>> m_session =
+ new MneDurableInputSession<DurableChunk<?>>(m_tacontext, null,
+ paths.toArray(new Path[0]), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+ SessionIterator<DurableChunk<?>, ?, Void, Void> m_iter = m_session.iterator();
+ DurableChunk<?> val = null;
+ while (m_iter.hasNext()) {
+ val = m_iter.next();
+ byte b;
+ for (int j = 0; j < val.getSize(); ++j) {
+ b = unsafe.getByte(val.get() + j);
+ cs.update(b);
+ }
+ tsize += val.getSize();
+ ++reccnt;
+ }
+ AssertJUnit.assertEquals(m_reccnt, reccnt);
+ AssertJUnit.assertEquals(m_totalsize, tsize);
+ AssertJUnit.assertEquals(m_checksum, cs.getValue());
+ System.out.println(String.format("The checksum of chunk is %d [Batch Mode]", m_checksum));
+ }
+}
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredLongDataTest.java
similarity index 53%
copy from mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
copy to mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredLongDataTest.java
index 71787f6..4d9c673 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredLongDataTest.java
@@ -16,44 +16,44 @@
* limitations under the License.
*/
-package org.apache.mnemonic.mapreduce;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Random;
+package org.apache.mnemonic.mapred;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.mnemonic.DurableType;
import org.apache.mnemonic.Utils;
import org.apache.mnemonic.hadoop.MneConfigHelper;
import org.apache.mnemonic.hadoop.MneDurableInputValue;
import org.apache.mnemonic.hadoop.MneDurableOutputSession;
import org.apache.mnemonic.hadoop.MneDurableOutputValue;
-import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
-import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneInputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneOutputFormat;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class MneMapreducePersonDataTest {
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+public class MneMapredLongDataTest {
private static final String DEFAULT_BASE_WORK_DIR = "target" + File.separator + "test" + File.separator + "tmp";
- private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator + "person-data";
+ private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator + "long-data";
private static final String SERVICE_NAME = "pmalloc";
- private static final long SLOT_KEY_ID = 5L;
+ private static final long SLOT_KEY_ID = 7L;
private Path m_workdir;
private JobConf m_conf;
private FileSystem m_fs;
@@ -61,12 +61,12 @@ public class MneMapreducePersonDataTest {
private TaskAttemptID m_taid;
private TaskAttemptContext m_tacontext;
private long m_reccnt = 500000L;
- private long m_sumage = 0L;
+ private long m_sum = 0L;
@BeforeClass
public void setUp() throws IOException {
m_workdir = new Path(
- System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
+ System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
m_conf = new JobConf();
m_rand = Utils.createRandom();
@@ -82,22 +82,22 @@ public class MneMapreducePersonDataTest {
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
- MneConfigHelper.setBaseOutputName(m_conf, null, "person-data");
+ MneConfigHelper.setBaseOutputName(m_conf, null, "long-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setDurableTypes(m_conf,
- MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[] {DurableType.DURABLE});
+ MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.LONG});
MneConfigHelper.setEntityFactoryProxies(m_conf,
- MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[] {PersonListEFProxy.class});
+ MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{});
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setMemPoolSize(m_conf,
- MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 2);
MneConfigHelper.setDurableTypes(m_conf,
- MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[] {DurableType.DURABLE});
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.LONG});
MneConfigHelper.setEntityFactoryProxies(m_conf,
- MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[] {PersonListEFProxy.class});
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{});
}
@AfterClass
@@ -106,58 +106,60 @@ public class MneMapreducePersonDataTest {
}
@Test(enabled = true)
- public void testWritePersonData() throws Exception {
+ public void testWriteLongData() throws Exception {
NullWritable nada = NullWritable.get();
- MneDurableOutputSession<Person<Long>> sess =
- new MneDurableOutputSession<Person<Long>>(m_tacontext, null,
- MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
- MneDurableOutputValue<Person<Long>> mdvalue =
- new MneDurableOutputValue<Person<Long>>(sess);
- OutputFormat<NullWritable, MneDurableOutputValue<Person<Long>>> outputFormat =
- new MneOutputFormat<MneDurableOutputValue<Person<Long>>>();
- RecordWriter<NullWritable, MneDurableOutputValue<Person<Long>>> writer =
- outputFormat.getRecordWriter(m_tacontext);
- Person<Long> person = null;
+ MneDurableOutputSession<Long> sess =
+ new MneDurableOutputSession<Long>(m_tacontext, null,
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+ MneDurableOutputValue<Long> mdvalue =
+ new MneDurableOutputValue<Long>(sess);
+ OutputFormat<NullWritable, MneDurableOutputValue<Long>> outputFormat =
+ new MneOutputFormat<MneDurableOutputValue<Long>>();
+ RecordWriter<NullWritable, MneDurableOutputValue<Long>> writer =
+ outputFormat.getRecordWriter(null, m_conf, null, null);
+ Long val = null;
for (int i = 0; i < m_reccnt; ++i) {
- person = sess.newDurableObjectRecord();
- person.setAge((short) m_rand.nextInt(50));
- person.setName(String.format("Name: [%s]", Utils.genRandomString()), true);
- m_sumage += person.getAge();
- writer.write(nada, mdvalue.of(person));
+ val = m_rand.nextLong();
+ m_sum += val;
+ writer.write(nada, mdvalue.of(val));
}
- writer.close(m_tacontext);
+ writer.close(null);
sess.close();
}
- @Test(enabled = true, dependsOnMethods = { "testWritePersonData" })
- public void testReadPersonData() throws Exception {
- long sumage = 0L;
+ @Test(enabled = true, dependsOnMethods = {"testWriteLongData"})
+ public void testReadLongData() throws Exception {
+ long sum = 0L;
long reccnt = 0L;
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
- && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
- && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+ && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+ && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
FileSplit split = new FileSplit(
- new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
- InputFormat<NullWritable, MneDurableInputValue<Person<Long>>> inputFormat =
- new MneInputFormat<MneDurableInputValue<Person<Long>>, Person<Long>>();
- RecordReader<NullWritable, MneDurableInputValue<Person<Long>>> reader =
- inputFormat.createRecordReader(split, m_tacontext);
- MneDurableInputValue<Person<Long>> personval = null;
- while (reader.nextKeyValue()) {
- personval = reader.getCurrentValue();
- AssertJUnit.assertTrue(personval.getValue().getAge() < 51);
- sumage += personval.getValue().getAge();
- ++reccnt;
+ new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
+ InputFormat<NullWritable, MneDurableInputValue<Long>> inputFormat =
+ new MneInputFormat<MneDurableInputValue<Long>, Long>();
+ RecordReader<NullWritable, MneDurableInputValue<Long>> reader =
+ inputFormat.getRecordReader(split, m_conf, null);
+ MneDurableInputValue<Long> mdval = null;
+ NullWritable mdkey = reader.createKey();
+ while (true) {
+ mdval = reader.createValue();
+ if (reader.next(mdkey, mdval)) {
+ sum += mdval.getValue();
+ ++reccnt;
+ } else {
+ break;
+ }
}
reader.close();
}
}
+ AssertJUnit.assertEquals(m_sum, sum);
AssertJUnit.assertEquals(m_reccnt, reccnt);
- AssertJUnit.assertEquals(m_sumage, sumage);
- System.out.println(String.format("The checksum of ages is %d", sumage));
+ System.out.println(String.format("The checksum of long data is %d", sum));
}
}
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredPersonDataTest.java
similarity index 67%
copy from mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
copy to mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredPersonDataTest.java
index 71787f6..7a2a6e4 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredPersonDataTest.java
@@ -16,39 +16,41 @@
* limitations under the License.
*/
-package org.apache.mnemonic.mapreduce;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Random;
+package org.apache.mnemonic.mapred;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.mnemonic.DurableType;
import org.apache.mnemonic.Utils;
import org.apache.mnemonic.hadoop.MneConfigHelper;
import org.apache.mnemonic.hadoop.MneDurableInputValue;
import org.apache.mnemonic.hadoop.MneDurableOutputSession;
import org.apache.mnemonic.hadoop.MneDurableOutputValue;
-import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
-import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneInputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneOutputFormat;
+import org.apache.mnemonic.common.Person;
+import org.apache.mnemonic.common.PersonListEFProxy;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class MneMapreducePersonDataTest {
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+public class MneMapredPersonDataTest {
private static final String DEFAULT_BASE_WORK_DIR = "target" + File.separator + "test" + File.separator + "tmp";
private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator + "person-data";
@@ -66,7 +68,7 @@ public class MneMapreducePersonDataTest {
@BeforeClass
public void setUp() throws IOException {
m_workdir = new Path(
- System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
+ System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
m_conf = new JobConf();
m_rand = Utils.createRandom();
@@ -87,17 +89,17 @@ public class MneMapreducePersonDataTest {
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setDurableTypes(m_conf,
- MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[] {DurableType.DURABLE});
+ MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.DURABLE});
MneConfigHelper.setEntityFactoryProxies(m_conf,
- MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[] {PersonListEFProxy.class});
+ MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{PersonListEFProxy.class});
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setMemPoolSize(m_conf,
- MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
MneConfigHelper.setDurableTypes(m_conf,
- MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[] {DurableType.DURABLE});
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.DURABLE});
MneConfigHelper.setEntityFactoryProxies(m_conf,
- MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[] {PersonListEFProxy.class});
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{PersonListEFProxy.class});
}
@AfterClass
@@ -109,14 +111,14 @@ public class MneMapreducePersonDataTest {
public void testWritePersonData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<Person<Long>> sess =
- new MneDurableOutputSession<Person<Long>>(m_tacontext, null,
- MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+ new MneDurableOutputSession<Person<Long>>(m_tacontext, null,
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<Person<Long>> mdvalue =
- new MneDurableOutputValue<Person<Long>>(sess);
+ new MneDurableOutputValue<Person<Long>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<Person<Long>>> outputFormat =
- new MneOutputFormat<MneDurableOutputValue<Person<Long>>>();
+ new MneOutputFormat<MneDurableOutputValue<Person<Long>>>();
RecordWriter<NullWritable, MneDurableOutputValue<Person<Long>>> writer =
- outputFormat.getRecordWriter(m_tacontext);
+ outputFormat.getRecordWriter(null, m_conf, null, null);
Person<Long> person = null;
for (int i = 0; i < m_reccnt; ++i) {
person = sess.newDurableObjectRecord();
@@ -125,11 +127,11 @@ public class MneMapreducePersonDataTest {
m_sumage += person.getAge();
writer.write(nada, mdvalue.of(person));
}
- writer.close(m_tacontext);
+ writer.close(null);
sess.close();
}
- @Test(enabled = true, dependsOnMethods = { "testWritePersonData" })
+ @Test(enabled = true, dependsOnMethods = {"testWritePersonData"})
public void testReadPersonData() throws Exception {
long sumage = 0L;
long reccnt = 0L;
@@ -137,21 +139,26 @@ public class MneMapreducePersonDataTest {
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
- && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
- && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+ && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+ && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
FileSplit split = new FileSplit(
- new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
+ new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<Person<Long>>> inputFormat =
- new MneInputFormat<MneDurableInputValue<Person<Long>>, Person<Long>>();
+ new MneInputFormat<MneDurableInputValue<Person<Long>>, Person<Long>>();
RecordReader<NullWritable, MneDurableInputValue<Person<Long>>> reader =
- inputFormat.createRecordReader(split, m_tacontext);
+ inputFormat.getRecordReader(split, m_conf, null);
MneDurableInputValue<Person<Long>> personval = null;
- while (reader.nextKeyValue()) {
- personval = reader.getCurrentValue();
- AssertJUnit.assertTrue(personval.getValue().getAge() < 51);
- sumage += personval.getValue().getAge();
- ++reccnt;
+ NullWritable personkey = reader.createKey();
+ while (true) {
+ personval = reader.createValue();
+ if (reader.next(personkey, personval)) {
+ AssertJUnit.assertTrue(personval.getValue().getAge() < 51);
+ sumage += personval.getValue().getAge();
+ ++reccnt;
+ } else {
+ break;
+ }
}
reader.close();
}
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
index 71787f6..a72fec7 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
@@ -43,6 +43,8 @@ import org.apache.mnemonic.hadoop.MneDurableOutputSession;
import org.apache.mnemonic.hadoop.MneDurableOutputValue;
import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
+import org.apache.mnemonic.common.Person;
+import org.apache.mnemonic.common.PersonListEFProxy;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;