You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2019/03/04 07:28:46 UTC

[mnemonic] branch master updated: MNEMONIC-519: Add test cases for mapred based older Hadoop API

This is an automated email from the ASF dual-hosted git repository.

garyw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mnemonic.git


The following commit(s) were added to refs/heads/master by this push:
     new 80180a8  MNEMONIC-519: Add test cases for mapred based older Hadoop API
80180a8 is described below

commit 80180a8240c370f292a464fba5a359129dedb5c6
Author: Kevin Ratnasekera <dj...@yahoo.com>
AuthorDate: Mon Feb 25 20:23:49 2019 +0530

    MNEMONIC-519: Add test cases for mapred based older Hadoop API
---
 .../mnemonic/{mapreduce => common}/Person.java     |   2 +-
 .../{mapreduce => common}/PersonListEFProxy.java   |   2 +-
 .../mnemonic/mapred/MneMapredChunkDataTest.java    | 255 +++++++++++++++++++++
 .../MneMapredLongDataTest.java}                    | 124 +++++-----
 .../MneMapredPersonDataTest.java}                  |  85 +++----
 .../mapreduce/MneMapreducePersonDataTest.java      |   2 +
 6 files changed, 368 insertions(+), 102 deletions(-)

diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/Person.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/Person.java
similarity index 98%
rename from mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/Person.java
rename to mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/Person.java
index f61dd80..f753318 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/Person.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/Person.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.mnemonic.mapreduce;
+package org.apache.mnemonic.common;
 
 import org.apache.mnemonic.Durable;
 import org.apache.mnemonic.EntityFactoryProxy;
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/PersonListEFProxy.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/PersonListEFProxy.java
similarity index 98%
rename from mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/PersonListEFProxy.java
rename to mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/PersonListEFProxy.java
index 734fa0e..f16dbb0 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/PersonListEFProxy.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/common/PersonListEFProxy.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.mnemonic.mapreduce;
+package org.apache.mnemonic.common;
 
 
 import org.apache.mnemonic.DurableType;
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredChunkDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredChunkDataTest.java
new file mode 100644
index 0000000..521eca7
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredChunkDataTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.mapred;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.mnemonic.DurableChunk;
+import org.apache.mnemonic.DurableType;
+import org.apache.mnemonic.Utils;
+import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
+import org.apache.mnemonic.hadoop.MneDurableInputSession;
+import org.apache.mnemonic.hadoop.MneDurableOutputSession;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
+import org.apache.mnemonic.hadoop.mapred.MneInputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneOutputFormat;
+import org.apache.mnemonic.sessions.SessionIterator;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+@SuppressWarnings("restriction")
+public class MneMapredChunkDataTest {
+
+  private static final String DEFAULT_BASE_WORK_DIR = "target" + File.separator + "test" + File.separator + "tmp";
+  private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator + "chunk-data";
+  private static final String SERVICE_NAME = "pmalloc";
+  private static final long SLOT_KEY_ID = 5L;
+  private Path m_workdir;
+  private JobConf m_conf;
+  private FileSystem m_fs;
+  private Random m_rand;
+  private TaskAttemptID m_taid;
+  private TaskAttemptContext m_tacontext;
+  private long m_reccnt = 5000L;
+  private volatile long m_checksum;
+  private volatile long m_totalsize = 0L;
+  private Unsafe unsafe;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    m_workdir = new Path(
+            System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
+    m_conf = new JobConf();
+    m_rand = Utils.createRandom();
+    unsafe = Utils.getUnsafe();
+
+    try {
+      m_fs = FileSystem.getLocal(m_conf).getRaw();
+      m_fs.delete(m_workdir, true);
+      m_fs.mkdirs(m_workdir);
+    } catch (IOException e) {
+      throw new IllegalStateException("bad fs init", e);
+    }
+
+    m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+    m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
+
+    MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
+    MneConfigHelper.setBaseOutputName(m_conf, null, "chunk-data");
+
+    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
+    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+    MneConfigHelper.setDurableTypes(m_conf,
+            MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.CHUNK});
+    MneConfigHelper.setEntityFactoryProxies(m_conf,
+            MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{});
+    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
+    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+    MneConfigHelper.setMemPoolSize(m_conf,
+            MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
+    MneConfigHelper.setDurableTypes(m_conf,
+            MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.CHUNK});
+    MneConfigHelper.setEntityFactoryProxies(m_conf,
+            MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{});
+  }
+
+  @AfterClass
+  public void tearDown() {
+
+  }
+
+  protected DurableChunk<?> genupdDurableChunk(
+          MneDurableOutputSession<DurableChunk<?>> s, Checksum cs) {
+    DurableChunk<?> ret = null;
+    int sz = m_rand.nextInt(1024 * 1024) + 1024 * 1024;
+    ret = s.newDurableObjectRecord(sz);
+    byte b;
+    if (null != ret) {
+      for (int i = 0; i < ret.getSize(); ++i) {
+        b = (byte) m_rand.nextInt(255);
+        unsafe.putByte(ret.get() + i, b);
+        cs.update(b);
+      }
+      m_totalsize += sz;
+    }
+    return ret;
+  }
+
+  @Test(enabled = true)
+  public void testWriteChunkData() throws Exception {
+    NullWritable nada = NullWritable.get();
+    MneDurableOutputSession<DurableChunk<?>> sess =
+            new MneDurableOutputSession<DurableChunk<?>>(m_tacontext, null,
+                    MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+    MneDurableOutputValue<DurableChunk<?>> mdvalue =
+            new MneDurableOutputValue<DurableChunk<?>>(sess);
+    OutputFormat<NullWritable, MneDurableOutputValue<DurableChunk<?>>> outputFormat =
+            new MneOutputFormat<MneDurableOutputValue<DurableChunk<?>>>();
+    RecordWriter<NullWritable, MneDurableOutputValue<DurableChunk<?>>> writer =
+            outputFormat.getRecordWriter(null, m_conf, null, null);
+    DurableChunk<?> dchunk = null;
+    Checksum cs = new CRC32();
+    cs.reset();
+    for (int i = 0; i < m_reccnt; ++i) {
+      dchunk = genupdDurableChunk(sess, cs);
+      Assert.assertNotNull(dchunk);
+      writer.write(nada, mdvalue.of(dchunk));
+    }
+    m_checksum = cs.getValue();
+    writer.close(null);
+    sess.close();
+  }
+
+  @Test(enabled = true, dependsOnMethods = {"testWriteChunkData"})
+  public void testReadChunkData() throws Exception {
+    List<String> partfns = new ArrayList<String>();
+    long reccnt = 0L;
+    long tsize = 0L;
+    Checksum cs = new CRC32();
+    cs.reset();
+    File folder = new File(m_workdir.toString());
+    File[] listfiles = folder.listFiles();
+    for (int idx = 0; idx < listfiles.length; ++idx) {
+      if (listfiles[idx].isFile()
+              && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+              && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+        partfns.add(listfiles[idx].getName());
+      }
+    }
+    Collections.sort(partfns); // keep the order for checksum
+    for (int idx = 0; idx < partfns.size(); ++idx) {
+      System.out.println(String.format("Verifying : %s", partfns.get(idx)));
+      FileSplit split = new FileSplit(
+              new Path(m_workdir, partfns.get(idx)), 0, 0L, new String[0]);
+      InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>> inputFormat =
+              new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
+      RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>> reader =
+              inputFormat.getRecordReader(split, m_conf, null);
+      MneDurableInputValue<DurableChunk<?>> dchkval = null;
+      NullWritable dchkkey = reader.createKey();
+      while (true) {
+        dchkval = reader.createValue();
+        if (reader.next(dchkkey, dchkval)) {
+          byte b;
+          for (int j = 0; j < dchkval.getValue().getSize(); ++j) {
+            b = unsafe.getByte(dchkval.getValue().get() + j);
+            cs.update(b);
+          }
+          tsize += dchkval.getValue().getSize();
+          ++reccnt;
+        } else {
+          break;
+        }
+      }
+      reader.close();
+    }
+    AssertJUnit.assertEquals(m_reccnt, reccnt);
+    AssertJUnit.assertEquals(m_totalsize, tsize);
+    AssertJUnit.assertEquals(m_checksum, cs.getValue());
+    System.out.println(String.format("The checksum of chunk is %d", m_checksum));
+  }
+
+  @Test(enabled = true, dependsOnMethods = {"testWriteChunkData"})
+  public void testBatchReadChunkDataUsingInputSession() throws Exception {
+    List<String> partfns = new ArrayList<String>();
+    long reccnt = 0L;
+    long tsize = 0L;
+    Checksum cs = new CRC32();
+    cs.reset();
+    File folder = new File(m_workdir.toString());
+    File[] listfiles = folder.listFiles();
+    for (int idx = 0; idx < listfiles.length; ++idx) {
+      if (listfiles[idx].isFile()
+              && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+              && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+        partfns.add(listfiles[idx].getName());
+      }
+    }
+    Collections.sort(partfns); // keep the order for checksum
+    List<Path> paths = new ArrayList<Path>();
+    for (String fns : partfns) {
+      paths.add(new Path(m_workdir, fns));
+      System.out.println(String.format("[Batch Mode] Added : %s", fns));
+    }
+    MneDurableInputSession<DurableChunk<?>> m_session =
+            new MneDurableInputSession<DurableChunk<?>>(m_tacontext, null,
+                    paths.toArray(new Path[0]), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+    SessionIterator<DurableChunk<?>, ?, Void, Void> m_iter = m_session.iterator();
+    DurableChunk<?> val = null;
+    while (m_iter.hasNext()) {
+      val = m_iter.next();
+      byte b;
+      for (int j = 0; j < val.getSize(); ++j) {
+        b = unsafe.getByte(val.get() + j);
+        cs.update(b);
+      }
+      tsize += val.getSize();
+      ++reccnt;
+    }
+    AssertJUnit.assertEquals(m_reccnt, reccnt);
+    AssertJUnit.assertEquals(m_totalsize, tsize);
+    AssertJUnit.assertEquals(m_checksum, cs.getValue());
+    System.out.println(String.format("The checksum of chunk is %d [Batch Mode]", m_checksum));
+  }
+}
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredLongDataTest.java
similarity index 53%
copy from mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
copy to mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredLongDataTest.java
index 71787f6..4d9c673 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredLongDataTest.java
@@ -16,44 +16,44 @@
  * limitations under the License.
  */
 
-package org.apache.mnemonic.mapreduce;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Random;
+package org.apache.mnemonic.mapred;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.mnemonic.DurableType;
 import org.apache.mnemonic.Utils;
 import org.apache.mnemonic.hadoop.MneConfigHelper;
 import org.apache.mnemonic.hadoop.MneDurableInputValue;
 import org.apache.mnemonic.hadoop.MneDurableOutputSession;
 import org.apache.mnemonic.hadoop.MneDurableOutputValue;
-import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
-import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneInputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneOutputFormat;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class MneMapreducePersonDataTest {
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+public class MneMapredLongDataTest {
 
   private static final String DEFAULT_BASE_WORK_DIR = "target" + File.separator + "test" + File.separator + "tmp";
-  private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator + "person-data";
+  private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator + "long-data";
   private static final String SERVICE_NAME = "pmalloc";
-  private static final long SLOT_KEY_ID = 5L;
+  private static final long SLOT_KEY_ID = 7L;
   private Path m_workdir;
   private JobConf m_conf;
   private FileSystem m_fs;
@@ -61,12 +61,12 @@ public class MneMapreducePersonDataTest {
   private TaskAttemptID m_taid;
   private TaskAttemptContext m_tacontext;
   private long m_reccnt = 500000L;
-  private long m_sumage = 0L;
+  private long m_sum = 0L;
 
   @BeforeClass
   public void setUp() throws IOException {
     m_workdir = new Path(
-        System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
+            System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
     m_conf = new JobConf();
     m_rand = Utils.createRandom();
 
@@ -82,22 +82,22 @@ public class MneMapreducePersonDataTest {
     m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
 
     MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
-    MneConfigHelper.setBaseOutputName(m_conf, null, "person-data");
+    MneConfigHelper.setBaseOutputName(m_conf, null, "long-data");
 
     MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
     MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
     MneConfigHelper.setDurableTypes(m_conf,
-        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[] {DurableType.DURABLE});
+            MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.LONG});
     MneConfigHelper.setEntityFactoryProxies(m_conf,
-        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[] {PersonListEFProxy.class});
+            MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{});
     MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
     MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
     MneConfigHelper.setMemPoolSize(m_conf,
-        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
+            MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 2);
     MneConfigHelper.setDurableTypes(m_conf,
-        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[] {DurableType.DURABLE});
+            MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.LONG});
     MneConfigHelper.setEntityFactoryProxies(m_conf,
-        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[] {PersonListEFProxy.class});
+            MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{});
   }
 
   @AfterClass
@@ -106,58 +106,60 @@ public class MneMapreducePersonDataTest {
   }
 
   @Test(enabled = true)
-  public void testWritePersonData() throws Exception {
+  public void testWriteLongData() throws Exception {
     NullWritable nada = NullWritable.get();
-    MneDurableOutputSession<Person<Long>> sess =
-        new MneDurableOutputSession<Person<Long>>(m_tacontext, null,
-            MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
-    MneDurableOutputValue<Person<Long>> mdvalue =
-        new MneDurableOutputValue<Person<Long>>(sess);
-    OutputFormat<NullWritable, MneDurableOutputValue<Person<Long>>> outputFormat =
-        new MneOutputFormat<MneDurableOutputValue<Person<Long>>>();
-    RecordWriter<NullWritable, MneDurableOutputValue<Person<Long>>> writer =
-        outputFormat.getRecordWriter(m_tacontext);
-    Person<Long> person = null;
+    MneDurableOutputSession<Long> sess =
+            new MneDurableOutputSession<Long>(m_tacontext, null,
+                    MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+    MneDurableOutputValue<Long> mdvalue =
+            new MneDurableOutputValue<Long>(sess);
+    OutputFormat<NullWritable, MneDurableOutputValue<Long>> outputFormat =
+            new MneOutputFormat<MneDurableOutputValue<Long>>();
+    RecordWriter<NullWritable, MneDurableOutputValue<Long>> writer =
+            outputFormat.getRecordWriter(null, m_conf, null, null);
+    Long val = null;
     for (int i = 0; i < m_reccnt; ++i) {
-      person = sess.newDurableObjectRecord();
-      person.setAge((short) m_rand.nextInt(50));
-      person.setName(String.format("Name: [%s]", Utils.genRandomString()), true);
-      m_sumage += person.getAge();
-      writer.write(nada, mdvalue.of(person));
+      val = m_rand.nextLong();
+      m_sum += val;
+      writer.write(nada, mdvalue.of(val));
     }
-    writer.close(m_tacontext);
+    writer.close(null);
     sess.close();
   }
 
-  @Test(enabled = true, dependsOnMethods = { "testWritePersonData" })
-  public void testReadPersonData() throws Exception {
-    long sumage = 0L;
+  @Test(enabled = true, dependsOnMethods = {"testWriteLongData"})
+  public void testReadLongData() throws Exception {
+    long sum = 0L;
     long reccnt = 0L;
     File folder = new File(m_workdir.toString());
     File[] listfiles = folder.listFiles();
     for (int idx = 0; idx < listfiles.length; ++idx) {
       if (listfiles[idx].isFile()
-          && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
-          && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+              && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+              && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
         System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
         FileSplit split = new FileSplit(
-            new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
-        InputFormat<NullWritable, MneDurableInputValue<Person<Long>>> inputFormat =
-            new MneInputFormat<MneDurableInputValue<Person<Long>>, Person<Long>>();
-        RecordReader<NullWritable, MneDurableInputValue<Person<Long>>> reader =
-            inputFormat.createRecordReader(split, m_tacontext);
-        MneDurableInputValue<Person<Long>> personval = null;
-        while (reader.nextKeyValue()) {
-          personval = reader.getCurrentValue();
-          AssertJUnit.assertTrue(personval.getValue().getAge() < 51);
-          sumage += personval.getValue().getAge();
-          ++reccnt;
+                new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
+        InputFormat<NullWritable, MneDurableInputValue<Long>> inputFormat =
+                new MneInputFormat<MneDurableInputValue<Long>, Long>();
+        RecordReader<NullWritable, MneDurableInputValue<Long>> reader =
+                inputFormat.getRecordReader(split, m_conf, null);
+        MneDurableInputValue<Long> mdval = null;
+        NullWritable mdkey = reader.createKey();
+        while (true) {
+          mdval = reader.createValue();
+          if (reader.next(mdkey, mdval)) {
+            sum += mdval.getValue();
+            ++reccnt;
+          } else {
+            break;
+          }
         }
         reader.close();
       }
     }
+    AssertJUnit.assertEquals(m_sum, sum);
     AssertJUnit.assertEquals(m_reccnt, reccnt);
-    AssertJUnit.assertEquals(m_sumage, sumage);
-    System.out.println(String.format("The checksum of ages is %d", sumage));
+    System.out.println(String.format("The checksum of long data is %d", sum));
   }
 }
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredPersonDataTest.java
similarity index 67%
copy from mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
copy to mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredPersonDataTest.java
index 71787f6..7a2a6e4 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredPersonDataTest.java
@@ -16,39 +16,41 @@
  * limitations under the License.
  */
 
-package org.apache.mnemonic.mapreduce;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Random;
+package org.apache.mnemonic.mapred;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.mnemonic.DurableType;
 import org.apache.mnemonic.Utils;
 import org.apache.mnemonic.hadoop.MneConfigHelper;
 import org.apache.mnemonic.hadoop.MneDurableInputValue;
 import org.apache.mnemonic.hadoop.MneDurableOutputSession;
 import org.apache.mnemonic.hadoop.MneDurableOutputValue;
-import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
-import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneInputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneOutputFormat;
+import org.apache.mnemonic.common.Person;
+import org.apache.mnemonic.common.PersonListEFProxy;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class MneMapreducePersonDataTest {
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+public class MneMapredPersonDataTest {
 
   private static final String DEFAULT_BASE_WORK_DIR = "target" + File.separator + "test" + File.separator + "tmp";
   private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator + "person-data";
@@ -66,7 +68,7 @@ public class MneMapreducePersonDataTest {
   @BeforeClass
   public void setUp() throws IOException {
     m_workdir = new Path(
-        System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
+            System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
     m_conf = new JobConf();
     m_rand = Utils.createRandom();
 
@@ -87,17 +89,17 @@ public class MneMapreducePersonDataTest {
     MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
     MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
     MneConfigHelper.setDurableTypes(m_conf,
-        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[] {DurableType.DURABLE});
+            MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.DURABLE});
     MneConfigHelper.setEntityFactoryProxies(m_conf,
-        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[] {PersonListEFProxy.class});
+            MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{PersonListEFProxy.class});
     MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
     MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
     MneConfigHelper.setMemPoolSize(m_conf,
-        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
+            MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
     MneConfigHelper.setDurableTypes(m_conf,
-        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[] {DurableType.DURABLE});
+            MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.DURABLE});
     MneConfigHelper.setEntityFactoryProxies(m_conf,
-        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[] {PersonListEFProxy.class});
+            MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{PersonListEFProxy.class});
   }
 
   @AfterClass
@@ -109,14 +111,14 @@ public class MneMapreducePersonDataTest {
   public void testWritePersonData() throws Exception {
     NullWritable nada = NullWritable.get();
     MneDurableOutputSession<Person<Long>> sess =
-        new MneDurableOutputSession<Person<Long>>(m_tacontext, null,
-            MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+            new MneDurableOutputSession<Person<Long>>(m_tacontext, null,
+                    MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
     MneDurableOutputValue<Person<Long>> mdvalue =
-        new MneDurableOutputValue<Person<Long>>(sess);
+            new MneDurableOutputValue<Person<Long>>(sess);
     OutputFormat<NullWritable, MneDurableOutputValue<Person<Long>>> outputFormat =
-        new MneOutputFormat<MneDurableOutputValue<Person<Long>>>();
+            new MneOutputFormat<MneDurableOutputValue<Person<Long>>>();
     RecordWriter<NullWritable, MneDurableOutputValue<Person<Long>>> writer =
-        outputFormat.getRecordWriter(m_tacontext);
+            outputFormat.getRecordWriter(null, m_conf, null, null);
     Person<Long> person = null;
     for (int i = 0; i < m_reccnt; ++i) {
       person = sess.newDurableObjectRecord();
@@ -125,11 +127,11 @@ public class MneMapreducePersonDataTest {
       m_sumage += person.getAge();
       writer.write(nada, mdvalue.of(person));
     }
-    writer.close(m_tacontext);
+    writer.close(null);
     sess.close();
   }
 
-  @Test(enabled = true, dependsOnMethods = { "testWritePersonData" })
+  @Test(enabled = true, dependsOnMethods = {"testWritePersonData"})
   public void testReadPersonData() throws Exception {
     long sumage = 0L;
     long reccnt = 0L;
@@ -137,21 +139,26 @@ public class MneMapreducePersonDataTest {
     File[] listfiles = folder.listFiles();
     for (int idx = 0; idx < listfiles.length; ++idx) {
       if (listfiles[idx].isFile()
-          && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
-          && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+              && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+              && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
         System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
         FileSplit split = new FileSplit(
-            new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
+                new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
         InputFormat<NullWritable, MneDurableInputValue<Person<Long>>> inputFormat =
-            new MneInputFormat<MneDurableInputValue<Person<Long>>, Person<Long>>();
+                new MneInputFormat<MneDurableInputValue<Person<Long>>, Person<Long>>();
         RecordReader<NullWritable, MneDurableInputValue<Person<Long>>> reader =
-            inputFormat.createRecordReader(split, m_tacontext);
+                inputFormat.getRecordReader(split, m_conf, null);
         MneDurableInputValue<Person<Long>> personval = null;
-        while (reader.nextKeyValue()) {
-          personval = reader.getCurrentValue();
-          AssertJUnit.assertTrue(personval.getValue().getAge() < 51);
-          sumage += personval.getValue().getAge();
-          ++reccnt;
+        NullWritable personkey = reader.createKey();
+        while (true) {
+          personval = reader.createValue();
+          if (reader.next(personkey, personval)) {
+            AssertJUnit.assertTrue(personval.getValue().getAge() < 51);
+            sumage += personval.getValue().getAge();
+            ++reccnt;
+          } else {
+            break;
+          }
         }
         reader.close();
       }
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
index 71787f6..a72fec7 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
@@ -43,6 +43,8 @@ import org.apache.mnemonic.hadoop.MneDurableOutputSession;
 import org.apache.mnemonic.hadoop.MneDurableOutputValue;
 import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
 import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
+import org.apache.mnemonic.common.Person;
+import org.apache.mnemonic.common.PersonListEFProxy;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;