You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/03/23 21:09:45 UTC

incubator-mnemonic git commit: MNEMONIC-232: Improve the MneDurableInputSession to accept multiple paths

Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master 77997865e -> 472072e18


MNEMONIC-232: Improve the MneDurableInputSession to accept multiple paths


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/472072e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/472072e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/472072e1

Branch: refs/heads/master
Commit: 472072e180dc9fd6ebcc3043da99ee46e1886cc6
Parents: 7799786
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Thu Mar 23 12:59:45 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Thu Mar 23 13:25:33 2017 -0700

----------------------------------------------------------------------
 .../mnemonic/hadoop/MneDurableInputSession.java | 16 +++---
 .../hadoop/mapred/MneMapredRecordReader.java    |  3 +-
 .../mapreduce/MneMapreduceRecordReader.java     |  3 +-
 .../mapreduce/MneMapreduceChunkDataTest.java    | 58 +++++++++++++++++---
 4 files changed, 64 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/472072e1/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
index 0088873..7d2f77f 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -45,7 +45,7 @@ public class MneDurableInputSession<V>
   private Iterator<String> m_fp_iter;
 
   public MneDurableInputSession(TaskAttemptContext taskAttemptContext,
-      Configuration configuration, Path path, String prefix) {
+      Configuration configuration, Path[] paths, String prefix) {
     if (null == taskAttemptContext && null == configuration) {
       throw new ConfigurationException("Session is not configured properly");
     }
@@ -55,15 +55,17 @@ public class MneDurableInputSession<V>
     } else {
       setConfiguration(configuration);
     }
-    initialize(path, prefix);
+    initialize(paths, prefix);
   }
 
-  public void initialize(Path path, String prefix) {
-    if (!Files.isRegularFile(Paths.get(path.toString()), LinkOption.NOFOLLOW_LINKS)) {
-      throw new UnsupportedOperationException();
-    }
+  public void initialize(Path[] paths, String prefix) {
     List<String> fpathlist = new ArrayList<String>();
-    fpathlist.add(path.toString());
+    for (Path p : paths) {
+      if (!Files.isRegularFile(Paths.get(p.toString()), LinkOption.NOFOLLOW_LINKS)) {
+        throw new UnsupportedOperationException();
+      }
+      fpathlist.add(p.toString());
+    }
     m_fp_iter = fpathlist.iterator();
     readConfig(prefix);
   }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/472072e1/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
index ef3e4cc..27d8044 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.mnemonic.hadoop.mapred;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -46,7 +47,7 @@ public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
     public MneMapredRecordReader(FileSplit fileSplit, JobConf conf) throws IOException {
         m_fileSplit = fileSplit;
         m_session = new MneDurableInputSession<V>(null, conf,
-            m_fileSplit.getPath(), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+            new Path[]{m_fileSplit.getPath()}, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
         m_iter = m_session.iterator();
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/472072e1/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
index f5c0a75..dc0a76d 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
@@ -19,6 +19,7 @@ package org.apache.mnemonic.hadoop.mapreduce;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -51,7 +52,7 @@ public class MneMapreduceRecordReader<MV extends MneDurableInputValue<V>, V>
   public void initialize(InputSplit inputSplit, TaskAttemptContext context) {
     FileSplit split = (FileSplit) inputSplit;
     m_session = new MneDurableInputSession<V>(context, null,
-        split.getPath(), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+        new Path[]{split.getPath()}, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
     m_iter = m_session.iterator();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/472072e1/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
index f4e2061..cb70bd6 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
@@ -44,11 +44,13 @@ import org.apache.mnemonic.DurableChunk;
 import org.apache.mnemonic.DurableType;
 import org.apache.mnemonic.Utils;
 import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableInputSession;
 import org.apache.mnemonic.hadoop.MneDurableInputValue;
 import org.apache.mnemonic.hadoop.MneDurableOutputSession;
 import org.apache.mnemonic.hadoop.MneDurableOutputValue;
 import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
 import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
+import org.apache.mnemonic.sessions.SessionIterator;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
@@ -73,7 +75,6 @@ public class MneMapreduceChunkDataTest {
   private long m_reccnt = 5000L;
   private volatile long m_checksum;
   private volatile long m_totalsize = 0L;
-  private List<String> m_partfns;
   private Unsafe unsafe;
 
   @BeforeClass
@@ -82,7 +83,6 @@ public class MneMapreduceChunkDataTest {
         System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
     m_conf = new JobConf();
     m_rand = Utils.createRandom();
-    m_partfns = new ArrayList<String>();
     unsafe = Utils.getUnsafe();
 
     try {
@@ -164,6 +164,7 @@ public class MneMapreduceChunkDataTest {
 
   @Test(enabled = true, dependsOnMethods = { "testWriteChunkData" })
   public void testReadChunkData() throws Exception {
+    List<String> partfns = new ArrayList<String>();
     long reccnt = 0L;
     long tsize = 0L;
     Checksum cs = new CRC32();
@@ -174,14 +175,14 @@ public class MneMapreduceChunkDataTest {
       if (listfiles[idx].isFile()
           && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
           && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
-        m_partfns.add(listfiles[idx].getName());
+        partfns.add(listfiles[idx].getName());
       }
     }
-    Collections.sort(m_partfns); // keep the order for checksum
-    for (int idx = 0; idx < m_partfns.size(); ++idx) {
-      System.out.println(String.format("Verifying : %s", m_partfns.get(idx)));
+    Collections.sort(partfns); // keep the order for checksum
+    for (int idx = 0; idx < partfns.size(); ++idx) {
+      System.out.println(String.format("Verifying : %s", partfns.get(idx)));
       FileSplit split = new FileSplit(
-          new Path(m_workdir, m_partfns.get(idx)), 0, 0L, new String[0]);
+          new Path(m_workdir, partfns.get(idx)), 0, 0L, new String[0]);
       InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>> inputFormat =
           new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
       RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>> reader =
@@ -204,4 +205,47 @@ public class MneMapreduceChunkDataTest {
     AssertJUnit.assertEquals(m_checksum, cs.getValue());
     System.out.println(String.format("The checksum of chunk is %d", m_checksum));
   }
+
+  @Test(enabled = true, dependsOnMethods = { "testWriteChunkData" })
+  public void testBatchReadChunkDataUsingInputSession() throws Exception {
+    List<String> partfns = new ArrayList<String>();
+    long reccnt = 0L;
+    long tsize = 0L;
+    Checksum cs = new CRC32();
+    cs.reset();
+    File folder = new File(m_workdir.toString());
+    File[] listfiles = folder.listFiles();
+    for (int idx = 0; idx < listfiles.length; ++idx) {
+      if (listfiles[idx].isFile()
+          && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+          && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+        partfns.add(listfiles[idx].getName());
+      }
+    }
+    Collections.sort(partfns); // keep the order for checksum
+    List<Path> paths = new ArrayList<Path>();
+    for (String fns : partfns) {
+      paths.add(new Path(m_workdir, fns));
+      System.out.println(String.format("[Batch Mode] Added : %s", fns));
+    }
+    MneDurableInputSession<DurableChunk<?>> m_session =
+        new MneDurableInputSession<DurableChunk<?>>(m_tacontext, null,
+        paths.toArray(new Path[0]), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+    SessionIterator<DurableChunk<?>, ?> m_iter = m_session.iterator();
+    DurableChunk<?> val = null;
+    while (m_iter.hasNext()) {
+      val = m_iter.next();
+      byte b;
+      for (int j = 0; j < val.getSize(); ++j) {
+        b = unsafe.getByte(val.get() + j);
+        cs.update(b);
+      }
+      tsize += val.getSize();
+      ++reccnt;
+    }
+    AssertJUnit.assertEquals(m_reccnt, reccnt);
+    AssertJUnit.assertEquals(m_totalsize, tsize);
+    AssertJUnit.assertEquals(m_checksum, cs.getValue());
+    System.out.println(String.format("The checksum of chunk is %d [Batch Mode]", m_checksum));
+  }
 }