You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/03/21 19:27:32 UTC
incubator-mnemonic git commit: MNEMONIC-227: Unify the usage of
initNextPool() of IO sessions
Repository: incubator-mnemonic
Updated Branches:
refs/heads/master 8923b1eeb -> 640becdde
MNEMONIC-227: Unify the usage of initNextPool() of IO sessions
Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/640becdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/640becdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/640becdd
Branch: refs/heads/master
Commit: 640becdde6e14a40eeef1852a74534f640be14f0
Parents: 8923b1e
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Tue Mar 21 10:46:47 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Tue Mar 21 12:15:11 2017 -0700
----------------------------------------------------------------------
.../mnemonic/hadoop/MneDurableInputSession.java | 49 ++++++++++++--
.../hadoop/MneDurableOutputSession.java | 10 ++-
.../hadoop/mapred/MneMapredRecordReader.java | 4 +-
.../mapreduce/MneMapreduceRecordReader.java | 4 +-
.../mapreduce/MneMapreducePersonDataTest.java | 2 +-
.../mnemonic/sessions/DurableInputSession.java | 71 +++++++++++++++++---
.../mnemonic/sessions/DurableOutputSession.java | 34 +++++-----
.../apache/mnemonic/sessions/InputSession.java | 9 ++-
.../apache/mnemonic/sessions/OutputSession.java | 7 ++
9 files changed, 149 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
index 028b156..dc197ba 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -18,6 +18,13 @@
package org.apache.mnemonic.hadoop;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -25,6 +32,8 @@ import org.apache.mnemonic.ConfigurationException;
import org.apache.mnemonic.DurableType;
import org.apache.mnemonic.NonVolatileMemAllocator;
import org.apache.mnemonic.Utils;
+import org.apache.mnemonic.collections.DurableSinglyLinkedList;
+import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
import org.apache.mnemonic.sessions.DurableInputSession;
public class MneDurableInputSession<V>
@@ -32,14 +41,21 @@ public class MneDurableInputSession<V>
private TaskAttemptContext taskAttemptContext;
private Configuration configuration;
+ private Iterator<String> m_fp_iter;
- public MneDurableInputSession(TaskAttemptContext taskAttemptContext) {
- this(taskAttemptContext.getConfiguration());
+ public MneDurableInputSession(TaskAttemptContext taskAttemptContext, Path path) {
+ this(taskAttemptContext.getConfiguration(), path);
setTaskAttemptContext(taskAttemptContext);
}
- public MneDurableInputSession(Configuration configuration) {
+ public MneDurableInputSession(Configuration configuration, Path path) {
setConfiguration(configuration);
+ if (!Files.isRegularFile(Paths.get(path.toString()), LinkOption.NOFOLLOW_LINKS)) {
+ throw new UnsupportedOperationException();
+ }
+ List<String> fpathlist = new ArrayList<String>();
+ fpathlist.add(path.toString());
+ m_fp_iter = fpathlist.iterator();
}
public void validateConfig() {
@@ -66,10 +82,29 @@ public class MneDurableInputSession<V>
validateConfig();
}
- public void initialize(Path path) {
- m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), 1024000L,
- path.toString(), true);
- m_handler = m_act.getHandler(getSlotKeyId());
+ @Override
+ public boolean initNextPool() {
+ boolean ret = false;
+ if (m_act != null) {
+ m_act.close();
+ m_act = null;
+ }
+ if (null != m_fp_iter && m_fp_iter.hasNext()) {
+ m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), 1024000L,
+ m_fp_iter.next(), true);
+ if (null != m_act) {
+ m_handler = m_act.getHandler(getSlotKeyId());
+ if (0L != m_handler) {
+ DurableSinglyLinkedList<V> dsllist = DurableSinglyLinkedListFactory.restore(
+ m_act, getEntityFactoryProxies(), getDurableTypes(), m_handler, false);
+ if (null != dsllist) {
+ m_iter = dsllist.iterator();
+ ret = null != m_iter;
+ }
+ }
+ }
+ }
+ return ret;
}
public TaskAttemptContext getTaskAttemptContext() {
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
index 37e42e1..a9f0c62 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
@@ -122,14 +122,20 @@ public class MneDurableOutputSession<V>
}
@Override
- public void initNextPool() {
+ public boolean initNextPool() {
+ boolean ret = false;
if (m_act != null) {
m_act.close();
+ m_act = null;
}
setOutputPath(genNextPoolPath());
m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), getPoolSize(),
getOutputPath().toString(), true);
- m_newpool = true;
+ if (null != m_act) {
+ m_newpool = true;
+ ret = true;
+ }
+ return ret;
}
public Path getOutputPath() {
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
index 54db99f..4a63f57 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
@@ -45,9 +45,9 @@ public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
public MneMapredRecordReader(FileSplit fileSplit, JobConf conf) throws IOException {
m_fileSplit = fileSplit;
- m_session = new MneDurableInputSession<V>(conf);
+ m_session = new MneDurableInputSession<V>(conf, m_fileSplit.getPath());
m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
- m_session.initialize(m_fileSplit.getPath());
+ m_session.initNextPool();
m_iter = m_session.iterator();
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
index 8a2c599..45785dc 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
@@ -48,9 +48,9 @@ public class MneMapreduceRecordReader<MV extends MneDurableInputValue<V>, V>
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) {
FileSplit split = (FileSplit) inputSplit;
- m_session = new MneDurableInputSession<V>(context);
+ m_session = new MneDurableInputSession<V>(context, split.getPath());
m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
- m_session.initialize(split.getPath());
+ m_session.initNextPool();
m_iter = m_session.iterator();
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
index 5d74838..bf2b961 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
@@ -156,8 +156,8 @@ public class MneMapreducePersonDataTest {
reader.close();
}
}
- AssertJUnit.assertEquals(m_sumage, sumage);
AssertJUnit.assertEquals(m_reccnt, reccnt);
+ AssertJUnit.assertEquals(m_sumage, sumage);
System.out.println(String.format("The checksum of ages is %d", sumage));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
index e565e17..00e8d79 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
@@ -19,12 +19,11 @@
package org.apache.mnemonic.sessions;
import java.util.Iterator;
+import java.util.NoSuchElementException;
import org.apache.mnemonic.DurableType;
import org.apache.mnemonic.EntityFactoryProxy;
import org.apache.mnemonic.RestorableAllocator;
-import org.apache.mnemonic.collections.DurableSinglyLinkedList;
-import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
implements InputSession<V>, DurableComputable<A> {
@@ -36,20 +35,72 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
protected long m_handler;
protected A m_act;
-
+ protected Iterator<V> m_iter;
+
+ /**
+ * One session can only manage one iterator instance at a time for the simplicity
+ *
+ * @return the singleton iterator
+ *
+ */
@Override
public Iterator<V> iterator() {
- Iterator<V> iter;
- DurableSinglyLinkedList<V> dsllist;
- dsllist = DurableSinglyLinkedListFactory.restore(m_act, getEntityFactoryProxies(), getDurableTypes(), m_handler,
- false);
- iter = dsllist.iterator();
- return iter;
+ return new Intr();
+ }
+
+ /**
+ * this class defines a iterator for multiple pools read
+ *
+ */
+ private class Intr implements Iterator<V> {
+
+ /**
+ * determine the existing of next
+ *
+ * @return true if there is a next node
+ *
+ */
+ @Override
+ public boolean hasNext() {
+ if (null == m_iter) {
+ return false;
+ }
+ boolean ret = m_iter.hasNext();
+ if (!ret) {
+ if (initNextPool()) {
+ ret = m_iter.hasNext();
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * get next node
+ *
+ * @return the next node
+ */
+ @Override
+ public V next() {
+ if (null == m_iter) {
+ throw new NoSuchElementException();
+ }
+ return m_iter.next();
+ }
+
+ /**
+ * override remove()
+ */
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
}
@Override
public void close() {
- m_act.close();
+ if (null != m_act) {
+ m_act.close();
+ }
}
public String getServiceName() {
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
index 7289162..258ab72 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
@@ -46,8 +46,6 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
protected DurableSinglyLinkedList<V> m_listnode;
protected A m_act;
- protected abstract void initNextPool();
-
@Override
public A getAllocator() {
return m_act;
@@ -119,16 +117,17 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
if (ret != null) {
((Durable) ret).destroy();
}
- initNextPool();
- try { /* retry */
- nv = createDurableNode();
- ret = createDurableObjectRecord(size);
- } catch (OutOfHybridMemory ee) {
- if (nv != null) {
- nv.destroy();
- }
- if (ret != null) {
- ((Durable) ret).destroy();
+ if (initNextPool()) {
+ try { /* retry */
+ nv = createDurableNode();
+ ret = createDurableObjectRecord(size);
+ } catch (OutOfHybridMemory ee) {
+ if (nv != null) {
+ nv.destroy();
+ }
+ if (ret != null) {
+ ((Durable) ret).destroy();
+ }
}
}
}
@@ -168,8 +167,9 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
try {
nv = createDurableNode();
} catch (OutOfHybridMemory e) {
- initNextPool();
- nv = createDurableNode();
+ if (initNextPool()) {
+ nv = createDurableNode();
+ }
}
break;
}
@@ -201,8 +201,10 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
@Override
public void close() {
- destroyAllPendingRecords();
- m_act.close();
+ if (null != m_act) {
+ destroyAllPendingRecords();
+ m_act.close();
+ }
}
public long getSlotKeyId() {
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
index e2cfde1..e5e03a5 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
@@ -21,7 +21,14 @@ package org.apache.mnemonic.sessions;
import java.io.Closeable;
import java.util.Iterator;
-public interface InputSession<V> extends Closeable {
+public interface InputSession<V> extends Closeable, Iterable<V> {
+
+ /**
+ * Initialize the next pool, must be called before use
+ *
+ * @return true if success
+ */
+ boolean initNextPool();
Iterator<V> iterator();
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
index 598becc..19d1eaa 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
@@ -22,6 +22,13 @@ import java.io.Closeable;
public interface OutputSession<V> extends Closeable {
+ /**
+ * Initialize the next pool, must be called before use
+ *
+ * @return true if success
+ */
+ boolean initNextPool();
+
V newDurableObjectRecord();
V newDurableObjectRecord(long size);