You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/03/15 20:17:39 UTC

incubator-mnemonic git commit: MNEMONIC-216: Support Hadoop mapred APIs MNEMONIC-220: Remove the dependency on OutputDir of FileOutputFormat

Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master 1705665a5 -> 224ef2fec


MNEMONIC-216: Support Hadoop mapred APIs
MNEMONIC-220: Remove the dependency on OutputDir of FileOutputFormat


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/224ef2fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/224ef2fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/224ef2fe

Branch: refs/heads/master
Commit: 224ef2feceaa0da78b38f306f94f73b7d9a4d53e
Parents: 1705665
Author: paley <pa...@gmail.com>
Authored: Wed Mar 15 09:46:48 2017 -0700
Committer: paley <pa...@gmail.com>
Committed: Wed Mar 15 12:57:38 2017 -0700

----------------------------------------------------------------------
 build-tools/test.conf                           |   3 +
 .../apache/mnemonic/hadoop/MneConfigHelper.java |   9 +
 .../mnemonic/hadoop/MneDurableInputSession.java |   8 +-
 .../hadoop/MneDurableOutputSession.java         |  20 +-
 .../mnemonic/hadoop/mapred/MneInputFormat.java  |  50 +++++
 .../hadoop/mapred/MneMapredRecordReader.java    |  90 ++++++++
 .../hadoop/mapred/MneMapredRecordWriter.java    |  49 +++++
 .../mnemonic/hadoop/mapred/MneOutputFormat.java |  45 ++++
 .../mapred/MneMapredBufferDataTest.java         | 206 +++++++++++++++++++
 .../mapreduce/MneMapreduceBufferDataTest.java   |   2 +-
 .../mapreduce/MneMapreduceChunkDataTest.java    |   2 +-
 .../mapreduce/MneMapreduceLongDataTest.java     |   2 +-
 .../mapreduce/MneMapreducePersonDataTest.java   |   2 +-
 13 files changed, 472 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/build-tools/test.conf
----------------------------------------------------------------------
diff --git a/build-tools/test.conf b/build-tools/test.conf
index cbaa909..e46fb9a 100644
--- a/build-tools/test.conf
+++ b/build-tools/test.conf
@@ -50,3 +50,6 @@ mvn -Dtest=MneMapreduceLongDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-map
 mvn -Dtest=MneMapreduceBufferDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false
 
 mvn -Dtest=MneMapreduceChunkDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false
+
+# a testcase for module "mnemonic-hadoop/mnemonic-hadoop-mapreduce" that requires 'pmalloc' memory service to pass
+mvn -Dtest=MneMapredBufferDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
index b8974a4..e2b10fb 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
@@ -44,6 +44,7 @@ public class MneConfigHelper {
   public static final String DEFAULT_NAME_PART = "part";
   public static final String DEFAULT_FILE_EXTENSION = ".mne";
   public static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
+  public static final String DIR = "dir";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(MneConfigHelper.class);
 
@@ -129,5 +130,13 @@ public class MneConfigHelper {
   public static long getMemPoolSize(Configuration conf, String prefix) {
     return conf.getLong(getConfigName(prefix, MEM_POOL_SIZE), DEFAULT_OUTPUT_MEM_POOL_SIZE);
   }
+  
+  public static String getDir(Configuration conf, String prefix) {
+    return conf.get(getConfigName(prefix, DIR));
+  }
+
+  public static void setDir(Configuration conf, String prefix, String dirname) {
+    conf.set(getConfigName(prefix, DIR), dirname);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
index 2d7f4c3..469dc7a 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -46,8 +46,8 @@ public class MneDurableInputSession<V>
 
 
   public MneDurableInputSession(TaskAttemptContext taskAttemptContext) {
+    this(taskAttemptContext.getConfiguration());
     setTaskAttemptContext(taskAttemptContext);
-    setConfiguration(taskAttemptContext.getConfiguration());
   }
 
   public MneDurableInputSession(Configuration configuration) {
@@ -67,10 +67,10 @@ public class MneDurableInputSession<V>
 
   @Override
   public void readConfig(String prefix) {
-    if (getConfiguration() == null) {
-      throw new ConfigurationException("configuration has not yet been set");
-    }
     Configuration conf = getConfiguration();
+    if (conf == null) {
+      throw new ConfigurationException("Configuration has not yet been set");
+    }
     setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
     setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
     setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
index de26b14..71bc861 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public class MneDurableOutputSession<V>
     implements MneOutputSession<V>, MneDurableComputable<NonVolatileMemAllocator> {
@@ -63,13 +62,13 @@ public class MneDurableOutputSession<V>
   protected Iterator<V> m_iter;
 
   public MneDurableOutputSession(TaskAttemptContext taskAttemptContext) {
+    this(taskAttemptContext.getConfiguration());
     setTaskAttemptContext(taskAttemptContext);
-    m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
-    setConfiguration(taskAttemptContext.getConfiguration());
   }
   
   public MneDurableOutputSession(Configuration configuration) {
     setConfiguration(configuration);
+    m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
   }
 
   public void validateConfig() {
@@ -85,10 +84,10 @@ public class MneDurableOutputSession<V>
 
   @Override
   public void readConfig(String prefix) {
-    if (getTaskAttemptContext() == null) {
-      throw new ConfigurationException("taskAttemptContext has not yet been set");
-    }
     Configuration conf = getConfiguration();
+    if (conf == null) {
+      throw new ConfigurationException("Configuration has not yet been set");
+    }
     setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
     setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
     setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
@@ -101,12 +100,17 @@ public class MneDurableOutputSession<V>
   }
 
   protected Path genNextPoolPath() {
-    Path ret = new Path(FileOutputFormat.getOutputPath(getTaskAttemptContext()),
+    Path ret = new Path(getOutputDir(),
         getUniqueName(String.format("%s-%05d", getBaseOutputName(), ++m_poolidx), 
             MneConfigHelper.DEFAULT_FILE_EXTENSION));
     return ret;
   }
-  
+
+  protected Path getOutputDir() {
+    String name = MneConfigHelper.getDir(getConfiguration(), MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+    return name == null ? null : new Path(name);
+  }
+
   protected String getUniqueName(String name, String extension) {
     int partition;
     

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneInputFormat.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneInputFormat.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneInputFormat.java
new file mode 100644
index 0000000..61ed385
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneInputFormat.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
+
+/**
+ * A Mnemonic input format that satisfies the org.apache.hadoop.mapred API.
+ */
+
+public class MneInputFormat<MV extends MneDurableInputValue<V>, V>
+    extends FileInputFormat<NullWritable, MV> {
+
+    @Override
+    public RecordReader<NullWritable, MV> 
+        getRecordReader(InputSplit inputSpilt, 
+                    JobConf jobConf, 
+                    Reporter reporter
+                    ) throws IOException {
+      MneMapredRecordReader<MV, V> reader = 
+            new MneMapredRecordReader<MV, V>((FileSplit) inputSpilt, jobConf);
+      return reader;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
new file mode 100644
index 0000000..54db99f
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableInputSession;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
+
+/**
+ * This record reader implements the org.apache.hadoop.mapred API.
+ *
+ * @param <V>
+ *          the type of the data item
+ */
+
+public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
+    implements org.apache.hadoop.mapred.RecordReader<NullWritable, MV> {
+    
+    protected Iterator<V> m_iter;
+    protected MneDurableInputSession<V> m_session;
+    protected FileSplit m_fileSplit;
+
+  
+    public MneMapredRecordReader(FileSplit fileSplit, JobConf conf) throws IOException {
+        m_fileSplit = fileSplit;
+        m_session = new MneDurableInputSession<V>(conf);
+        m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+        m_session.initialize(m_fileSplit.getPath());
+        m_iter = m_session.iterator();
+    }
+    
+    @Override
+    public boolean next(NullWritable key, MV value) throws IOException {
+      boolean ret = false;
+      if (m_iter.hasNext()) {
+        value.of(m_iter.next());
+        ret = true;
+      }
+      return ret;
+    }
+
+    @Override
+    public NullWritable createKey() {
+        return NullWritable.get();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public MV createValue() {
+        return (MV) new MneDurableInputValue<V>(m_session);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return m_fileSplit.getLength();
+    }
+
+    @Override
+    public void close() throws IOException {
+        m_session.close();        
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+        return 0.5f; /* TBD */
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordWriter.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordWriter.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordWriter.java
new file mode 100644
index 0000000..cfd27f9
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordWriter.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
+
+/**
+ * This record writer implements the org.apache.hadoop.mapred API.
+ *
+ * @param <?>
+ *          the type of the data item
+ */
+
+public class MneMapredRecordWriter<MV extends MneDurableOutputValue<?>>
+    implements RecordWriter<NullWritable, MV> {
+    
+
+    @Override
+    public void write(NullWritable key, MV value) throws IOException {
+        value.post();
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneOutputFormat.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneOutputFormat.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneOutputFormat.java
new file mode 100644
index 0000000..be0b997
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneOutputFormat.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
+
+/**
+ * A Mnemonic output format that satisfies the org.apache.hadoop.mapred API.
+ */
+
+public class MneOutputFormat<MV extends MneDurableOutputValue<?>> extends FileOutputFormat<NullWritable, MV> {
+
+    @Override
+    public RecordWriter<NullWritable, MV> getRecordWriter(FileSystem ignored, JobConf job, String name,
+            Progressable progress) throws IOException {
+        return new MneMapredRecordWriter<MV>();
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredBufferDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredBufferDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredBufferDataTest.java
new file mode 100644
index 0000000..c178ee4
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredBufferDataTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.mnemonic.DurableBuffer;
+import org.apache.mnemonic.DurableType;
+import org.apache.mnemonic.Utils;
+import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
+import org.apache.mnemonic.hadoop.MneDurableOutputSession;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
+import org.apache.mnemonic.hadoop.mapred.MneInputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneOutputFormat;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MneMapredBufferDataTest {
+
+  private static final String DEFAULT_BASE_WORK_DIR = "target" + File.separator + "test" + File.separator + "tmp";
+  private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator + "mapred-buffer-data";
+  private static final String SERVICE_NAME = "pmalloc";
+  private static final long SLOT_KEY_ID = 5L;
+  private static final int TASK_PARTITION = 0;
+  
+  private Path m_workdir;
+  private JobConf m_conf;
+  private FileSystem m_fs;
+  private Random m_rand;
+  private long m_reccnt = 5000L;
+  private volatile long m_checksum;
+  private volatile long m_totalsize = 0L;
+  private List<String> m_partfns;
+
+  @BeforeClass
+  public void setUp() throws IOException {
+    m_workdir = new Path(
+        System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
+    m_conf = new JobConf();
+    m_rand = Utils.createRandom();
+    m_partfns = new ArrayList<String>();
+
+    try {
+      m_fs = FileSystem.getLocal(m_conf).getRaw();
+      m_fs.delete(m_workdir, true);
+      m_fs.mkdirs(m_workdir);
+    } catch (IOException e) {
+      throw new IllegalStateException("bad fs init", e);
+    }
+
+    m_conf.setInt(JobContext.TASK_PARTITION, TASK_PARTITION);
+    
+    MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
+    MneConfigHelper.setBaseOutputName(m_conf, null, "mapred-buffer-data");
+
+    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
+    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+    MneConfigHelper.setDurableTypes(m_conf,
+        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[] {DurableType.BUFFER});
+    MneConfigHelper.setEntityFactoryProxies(m_conf,
+        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[] {});
+    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
+    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+    MneConfigHelper.setMemPoolSize(m_conf,
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
+    MneConfigHelper.setDurableTypes(m_conf,
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[] {DurableType.BUFFER});
+    MneConfigHelper.setEntityFactoryProxies(m_conf,
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[] {});
+  }
+
+  @AfterClass
+  public void tearDown() {
+
+  }
+
+  protected DurableBuffer<?> genupdDurableBuffer(
+      MneDurableOutputSession<DurableBuffer<?>> s, Checksum cs) {
+    DurableBuffer<?> ret = null;
+    int sz = m_rand.nextInt(1024 * 1024) + 1024 * 1024;
+    ret = s.newDurableObjectRecord(sz);
+    if (null != ret) {
+      ret.get().clear();
+      byte[] rdbytes = RandomUtils.nextBytes(sz);
+      Assert.assertNotNull(rdbytes);
+      ret.get().put(rdbytes);
+      cs.update(rdbytes, 0, rdbytes.length);
+      m_totalsize += sz;
+    }
+    return ret;
+  }
+
+  @Test(enabled = true)
+  public void testWriteBufferData() throws Exception {
+    NullWritable nada = NullWritable.get();
+    MneDurableOutputSession<DurableBuffer<?>> sess = new MneDurableOutputSession<DurableBuffer<?>>(m_conf);
+    sess.readConfig(MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+    sess.initNextPool();
+    MneDurableOutputValue<DurableBuffer<?>> mdvalue =
+        new MneDurableOutputValue<DurableBuffer<?>>(sess);
+    OutputFormat<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> outputFormat =
+        new MneOutputFormat<MneDurableOutputValue<DurableBuffer<?>>>();
+    RecordWriter<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> writer =
+        outputFormat.getRecordWriter(m_fs, m_conf, null, null);
+    DurableBuffer<?> dbuf = null;
+    Checksum cs = new CRC32();
+    cs.reset();
+    for (int i = 0; i < m_reccnt; ++i) {
+      dbuf = genupdDurableBuffer(sess, cs);
+      Assert.assertNotNull(dbuf);
+      writer.write(nada, mdvalue.of(dbuf));
+    }
+    m_checksum = cs.getValue();
+    writer.close(null);
+    sess.close();
+  }
+
+  @Test(enabled = true, dependsOnMethods = { "testWriteBufferData" })
+  public void testReadBufferData() throws Exception {
+    long reccnt = 0L;
+    long tsize = 0L;
+    byte[] buf;
+    Checksum cs = new CRC32();
+    cs.reset();
+    File folder = new File(m_workdir.toString());
+    File[] listfiles = folder.listFiles();
+    for (int idx = 0; idx < listfiles.length; ++idx) {
+      if (listfiles[idx].isFile()
+          && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+          && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+        m_partfns.add(listfiles[idx].getName());
+      }
+    }
+    Collections.sort(m_partfns); // keep the order for checksum
+    for (int idx = 0; idx < m_partfns.size(); ++idx) {
+      System.out.println(String.format("Verifying : %s", m_partfns.get(idx)));
+      FileSplit split = new FileSplit(
+          new Path(m_workdir, m_partfns.get(idx)), 0, 0L, new String[0]);
+      InputFormat<NullWritable, MneDurableInputValue<DurableBuffer<?>>> inputFormat =
+          new MneInputFormat<MneDurableInputValue<DurableBuffer<?>>, DurableBuffer<?>>();
+      RecordReader<NullWritable, MneDurableInputValue<DurableBuffer<?>>> reader =
+          inputFormat.getRecordReader((InputSplit) split, m_conf, null);
+      NullWritable dbufkey = reader.createKey();
+      MneDurableInputValue<DurableBuffer<?>> dbufval = null;
+      while (true) {
+        dbufval = reader.createValue();
+        if (reader.next(dbufkey, dbufval)) {
+          assert dbufval.getValue().getSize() == dbufval.getValue().get().capacity();
+          dbufval.getValue().get().clear();
+          buf = new byte[dbufval.getValue().get().capacity()];
+          dbufval.getValue().get().get(buf);
+          cs.update(buf, 0, buf.length);
+          tsize += dbufval.getValue().getSize();
+          ++reccnt;
+        } else {
+          break;
+        }
+      }
+      reader.close();
+    }
+    AssertJUnit.assertEquals(m_reccnt, reccnt);
+    AssertJUnit.assertEquals(m_totalsize, tsize);
+    AssertJUnit.assertEquals(m_checksum, cs.getValue());
+    System.out.println(String.format("The checksum of buffer is %d", m_checksum));
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
index e29a8c8..7b65661 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
@@ -92,7 +92,7 @@ public class MneMapreduceBufferDataTest {
     m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
     m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
 
-    m_conf.set("mapreduce.output.fileoutputformat.outputdir", m_workdir.toString());
+    MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
     MneConfigHelper.setBaseOutputName(m_conf, null, "buffer-data");
 
     MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
index 1dfe7ad..4a007d2 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
@@ -96,7 +96,7 @@ public class MneMapreduceChunkDataTest {
     m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
     m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
 
-    m_conf.set("mapreduce.output.fileoutputformat.outputdir", m_workdir.toString());
+    MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
     MneConfigHelper.setBaseOutputName(m_conf, null, "chunk-data");
 
     MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceLongDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceLongDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceLongDataTest.java
index bc6c2de..18a9e81 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceLongDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceLongDataTest.java
@@ -81,7 +81,7 @@ public class MneMapreduceLongDataTest {
     m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
     m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
 
-    m_conf.set("mapreduce.output.fileoutputformat.outputdir", m_workdir.toString());
+    MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
     MneConfigHelper.setBaseOutputName(m_conf, null, "long-data");
 
     MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
index 635ed38..5d74838 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
@@ -81,7 +81,7 @@ public class MneMapreducePersonDataTest {
     m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
     m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
 
-    m_conf.set("mapreduce.output.fileoutputformat.outputdir", m_workdir.toString());
+    MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
     MneConfigHelper.setBaseOutputName(m_conf, null, "person-data");
 
     MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);