You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/03/23 21:09:45 UTC
incubator-mnemonic git commit: MNEMONIC-232: Improve the
MneDurableInputSession to accept multiple paths
Repository: incubator-mnemonic
Updated Branches:
refs/heads/master 77997865e -> 472072e18
MNEMONIC-232: Improve the MneDurableInputSession to accept multiple paths
Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/472072e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/472072e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/472072e1
Branch: refs/heads/master
Commit: 472072e180dc9fd6ebcc3043da99ee46e1886cc6
Parents: 7799786
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Thu Mar 23 12:59:45 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Thu Mar 23 13:25:33 2017 -0700
----------------------------------------------------------------------
.../mnemonic/hadoop/MneDurableInputSession.java | 16 +++---
.../hadoop/mapred/MneMapredRecordReader.java | 3 +-
.../mapreduce/MneMapreduceRecordReader.java | 3 +-
.../mapreduce/MneMapreduceChunkDataTest.java | 58 +++++++++++++++++---
4 files changed, 64 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/472072e1/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
index 0088873..7d2f77f 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -45,7 +45,7 @@ public class MneDurableInputSession<V>
private Iterator<String> m_fp_iter;
public MneDurableInputSession(TaskAttemptContext taskAttemptContext,
- Configuration configuration, Path path, String prefix) {
+ Configuration configuration, Path[] paths, String prefix) {
if (null == taskAttemptContext && null == configuration) {
throw new ConfigurationException("Session is not configured properly");
}
@@ -55,15 +55,17 @@ public class MneDurableInputSession<V>
} else {
setConfiguration(configuration);
}
- initialize(path, prefix);
+ initialize(paths, prefix);
}
- public void initialize(Path path, String prefix) {
- if (!Files.isRegularFile(Paths.get(path.toString()), LinkOption.NOFOLLOW_LINKS)) {
- throw new UnsupportedOperationException();
- }
+ public void initialize(Path[] paths, String prefix) {
List<String> fpathlist = new ArrayList<String>();
- fpathlist.add(path.toString());
+ for (Path p : paths) {
+ if (!Files.isRegularFile(Paths.get(p.toString()), LinkOption.NOFOLLOW_LINKS)) {
+ throw new UnsupportedOperationException();
+ }
+ fpathlist.add(p.toString());
+ }
m_fp_iter = fpathlist.iterator();
readConfig(prefix);
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/472072e1/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
index ef3e4cc..27d8044 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.mnemonic.hadoop.mapred;
import java.io.IOException;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -46,7 +47,7 @@ public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
public MneMapredRecordReader(FileSplit fileSplit, JobConf conf) throws IOException {
m_fileSplit = fileSplit;
m_session = new MneDurableInputSession<V>(null, conf,
- m_fileSplit.getPath(), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+ new Path[]{m_fileSplit.getPath()}, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
m_iter = m_session.iterator();
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/472072e1/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
index f5c0a75..dc0a76d 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
@@ -19,6 +19,7 @@ package org.apache.mnemonic.hadoop.mapreduce;
import java.io.IOException;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -51,7 +52,7 @@ public class MneMapreduceRecordReader<MV extends MneDurableInputValue<V>, V>
public void initialize(InputSplit inputSplit, TaskAttemptContext context) {
FileSplit split = (FileSplit) inputSplit;
m_session = new MneDurableInputSession<V>(context, null,
- split.getPath(), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+ new Path[]{split.getPath()}, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
m_iter = m_session.iterator();
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/472072e1/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
index f4e2061..cb70bd6 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
@@ -44,11 +44,13 @@ import org.apache.mnemonic.DurableChunk;
import org.apache.mnemonic.DurableType;
import org.apache.mnemonic.Utils;
import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableInputSession;
import org.apache.mnemonic.hadoop.MneDurableInputValue;
import org.apache.mnemonic.hadoop.MneDurableOutputSession;
import org.apache.mnemonic.hadoop.MneDurableOutputValue;
import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
+import org.apache.mnemonic.sessions.SessionIterator;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
@@ -73,7 +75,6 @@ public class MneMapreduceChunkDataTest {
private long m_reccnt = 5000L;
private volatile long m_checksum;
private volatile long m_totalsize = 0L;
- private List<String> m_partfns;
private Unsafe unsafe;
@BeforeClass
@@ -82,7 +83,6 @@ public class MneMapreduceChunkDataTest {
System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
m_conf = new JobConf();
m_rand = Utils.createRandom();
- m_partfns = new ArrayList<String>();
unsafe = Utils.getUnsafe();
try {
@@ -164,6 +164,7 @@ public class MneMapreduceChunkDataTest {
@Test(enabled = true, dependsOnMethods = { "testWriteChunkData" })
public void testReadChunkData() throws Exception {
+ List<String> partfns = new ArrayList<String>();
long reccnt = 0L;
long tsize = 0L;
Checksum cs = new CRC32();
@@ -174,14 +175,14 @@ public class MneMapreduceChunkDataTest {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
- m_partfns.add(listfiles[idx].getName());
+ partfns.add(listfiles[idx].getName());
}
}
- Collections.sort(m_partfns); // keep the order for checksum
- for (int idx = 0; idx < m_partfns.size(); ++idx) {
- System.out.println(String.format("Verifying : %s", m_partfns.get(idx)));
+ Collections.sort(partfns); // keep the order for checksum
+ for (int idx = 0; idx < partfns.size(); ++idx) {
+ System.out.println(String.format("Verifying : %s", partfns.get(idx)));
FileSplit split = new FileSplit(
- new Path(m_workdir, m_partfns.get(idx)), 0, 0L, new String[0]);
+ new Path(m_workdir, partfns.get(idx)), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>> inputFormat =
new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>> reader =
@@ -204,4 +205,47 @@ public class MneMapreduceChunkDataTest {
AssertJUnit.assertEquals(m_checksum, cs.getValue());
System.out.println(String.format("The checksum of chunk is %d", m_checksum));
}
+
+ @Test(enabled = true, dependsOnMethods = { "testWriteChunkData" })
+ public void testBatchReadChunkDataUsingInputSession() throws Exception {
+ List<String> partfns = new ArrayList<String>();
+ long reccnt = 0L;
+ long tsize = 0L;
+ Checksum cs = new CRC32();
+ cs.reset();
+ File folder = new File(m_workdir.toString());
+ File[] listfiles = folder.listFiles();
+ for (int idx = 0; idx < listfiles.length; ++idx) {
+ if (listfiles[idx].isFile()
+ && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+ && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+ partfns.add(listfiles[idx].getName());
+ }
+ }
+ Collections.sort(partfns); // keep the order for checksum
+ List<Path> paths = new ArrayList<Path>();
+ for (String fns : partfns) {
+ paths.add(new Path(m_workdir, fns));
+ System.out.println(String.format("[Batch Mode] Added : %s", fns));
+ }
+ MneDurableInputSession<DurableChunk<?>> m_session =
+ new MneDurableInputSession<DurableChunk<?>>(m_tacontext, null,
+ paths.toArray(new Path[0]), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+ SessionIterator<DurableChunk<?>, ?> m_iter = m_session.iterator();
+ DurableChunk<?> val = null;
+ while (m_iter.hasNext()) {
+ val = m_iter.next();
+ byte b;
+ for (int j = 0; j < val.getSize(); ++j) {
+ b = unsafe.getByte(val.get() + j);
+ cs.update(b);
+ }
+ tsize += val.getSize();
+ ++reccnt;
+ }
+ AssertJUnit.assertEquals(m_reccnt, reccnt);
+ AssertJUnit.assertEquals(m_totalsize, tsize);
+ AssertJUnit.assertEquals(m_checksum, cs.getValue());
+ System.out.println(String.format("The checksum of chunk is %d [Batch Mode]", m_checksum));
+ }
}