You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/03/21 19:27:32 UTC

incubator-mnemonic git commit: MNEMONIC-227: Unify the usage of initNextPool() of IO sessions

Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master 8923b1eeb -> 640becdde


MNEMONIC-227: Unify the usage of initNextPool() of IO sessions


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/640becdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/640becdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/640becdd

Branch: refs/heads/master
Commit: 640becdde6e14a40eeef1852a74534f640be14f0
Parents: 8923b1e
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Tue Mar 21 10:46:47 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Tue Mar 21 12:15:11 2017 -0700

----------------------------------------------------------------------
 .../mnemonic/hadoop/MneDurableInputSession.java | 49 ++++++++++++--
 .../hadoop/MneDurableOutputSession.java         | 10 ++-
 .../hadoop/mapred/MneMapredRecordReader.java    |  4 +-
 .../mapreduce/MneMapreduceRecordReader.java     |  4 +-
 .../mapreduce/MneMapreducePersonDataTest.java   |  2 +-
 .../mnemonic/sessions/DurableInputSession.java  | 71 +++++++++++++++++---
 .../mnemonic/sessions/DurableOutputSession.java | 34 +++++-----
 .../apache/mnemonic/sessions/InputSession.java  |  9 ++-
 .../apache/mnemonic/sessions/OutputSession.java |  7 ++
 9 files changed, 149 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
index 028b156..dc197ba 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -18,6 +18,13 @@
 
 package org.apache.mnemonic.hadoop;
 
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -25,6 +32,8 @@ import org.apache.mnemonic.ConfigurationException;
 import org.apache.mnemonic.DurableType;
 import org.apache.mnemonic.NonVolatileMemAllocator;
 import org.apache.mnemonic.Utils;
+import org.apache.mnemonic.collections.DurableSinglyLinkedList;
+import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
 import org.apache.mnemonic.sessions.DurableInputSession;
 
 public class MneDurableInputSession<V>
@@ -32,14 +41,21 @@ public class MneDurableInputSession<V>
 
   private TaskAttemptContext taskAttemptContext;
   private Configuration configuration;
+  private Iterator<String> m_fp_iter;
 
-  public MneDurableInputSession(TaskAttemptContext taskAttemptContext) {
-    this(taskAttemptContext.getConfiguration());
+  public MneDurableInputSession(TaskAttemptContext taskAttemptContext, Path path) {
+    this(taskAttemptContext.getConfiguration(), path);
     setTaskAttemptContext(taskAttemptContext);
   }
 
-  public MneDurableInputSession(Configuration configuration) {
+  public MneDurableInputSession(Configuration configuration, Path path) {
     setConfiguration(configuration);
+    if (!Files.isRegularFile(Paths.get(path.toString()), LinkOption.NOFOLLOW_LINKS)) {
+      throw new UnsupportedOperationException();
+    }
+    List<String> fpathlist = new ArrayList<String>();
+    fpathlist.add(path.toString());
+    m_fp_iter = fpathlist.iterator();
   }
 
   public void validateConfig() {
@@ -66,10 +82,29 @@ public class MneDurableInputSession<V>
     validateConfig();
   }
 
-  public void initialize(Path path) {
-    m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), 1024000L,
-        path.toString(), true);
-    m_handler = m_act.getHandler(getSlotKeyId());
+  @Override
+  public boolean initNextPool() {
+    boolean ret = false;
+    if (m_act != null) {
+      m_act.close();
+      m_act = null;
+    }
+    if (null != m_fp_iter && m_fp_iter.hasNext()) {
+      m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), 1024000L,
+          m_fp_iter.next(), true);
+      if (null != m_act) {
+        m_handler = m_act.getHandler(getSlotKeyId());
+        if (0L != m_handler) {
+          DurableSinglyLinkedList<V> dsllist = DurableSinglyLinkedListFactory.restore(
+              m_act, getEntityFactoryProxies(), getDurableTypes(), m_handler, false);
+          if (null != dsllist) {
+            m_iter = dsllist.iterator();
+            ret = null != m_iter;
+          }
+        }
+      }
+    }
+    return ret;
   }
 
   public TaskAttemptContext getTaskAttemptContext() {

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
index 37e42e1..a9f0c62 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
@@ -122,14 +122,20 @@ public class MneDurableOutputSession<V>
   }
 
   @Override
-  public void initNextPool() {
+  public boolean initNextPool() {
+    boolean ret = false;
     if (m_act != null) {
       m_act.close();
+      m_act = null;
     }
     setOutputPath(genNextPoolPath());
     m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), getPoolSize(),
         getOutputPath().toString(), true);
-    m_newpool = true;
+    if (null != m_act) {
+      m_newpool = true;
+      ret = true;
+    }
+    return ret;
   }
 
   public Path getOutputPath() {

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
index 54db99f..4a63f57 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
@@ -45,9 +45,9 @@ public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
   
     public MneMapredRecordReader(FileSplit fileSplit, JobConf conf) throws IOException {
         m_fileSplit = fileSplit;
-        m_session = new MneDurableInputSession<V>(conf);
+        m_session = new MneDurableInputSession<V>(conf, m_fileSplit.getPath());
         m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
-        m_session.initialize(m_fileSplit.getPath());
+        m_session.initNextPool();
         m_iter = m_session.iterator();
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
index 8a2c599..45785dc 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
@@ -48,9 +48,9 @@ public class MneMapreduceRecordReader<MV extends MneDurableInputValue<V>, V>
   @Override
   public void initialize(InputSplit inputSplit, TaskAttemptContext context) {
     FileSplit split = (FileSplit) inputSplit;
-    m_session = new MneDurableInputSession<V>(context);
+    m_session = new MneDurableInputSession<V>(context, split.getPath());
     m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
-    m_session.initialize(split.getPath());
+    m_session.initNextPool();
     m_iter = m_session.iterator();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
index 5d74838..bf2b961 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
@@ -156,8 +156,8 @@ public class MneMapreducePersonDataTest {
         reader.close();
       }
     }
-    AssertJUnit.assertEquals(m_sumage, sumage);
     AssertJUnit.assertEquals(m_reccnt, reccnt);
+    AssertJUnit.assertEquals(m_sumage, sumage);
     System.out.println(String.format("The checksum of ages is %d", sumage));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
index e565e17..00e8d79 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
@@ -19,12 +19,11 @@
 package org.apache.mnemonic.sessions;
 
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 import org.apache.mnemonic.DurableType;
 import org.apache.mnemonic.EntityFactoryProxy;
 import org.apache.mnemonic.RestorableAllocator;
-import org.apache.mnemonic.collections.DurableSinglyLinkedList;
-import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
 
 public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
     implements InputSession<V>, DurableComputable<A> {
@@ -36,20 +35,72 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
 
   protected long m_handler;
   protected A m_act;
-
+  protected Iterator<V> m_iter;
+
+  /**
+   * One session can only manage one iterator instance at a time for the simplicity
+   *
+   * @return the singleton iterator
+   *
+   */
   @Override
   public Iterator<V> iterator() {
-    Iterator<V> iter;
-    DurableSinglyLinkedList<V> dsllist;
-    dsllist = DurableSinglyLinkedListFactory.restore(m_act, getEntityFactoryProxies(), getDurableTypes(), m_handler,
-        false);
-    iter = dsllist.iterator();
-    return iter;
+    return new Intr();
+  }
+
+  /**
+   * this class defines a iterator for multiple pools read
+   *
+   */
+  private class Intr implements Iterator<V> {
+
+    /**
+     * determine the existing of next
+     *
+     * @return true if there is a next node
+     *
+     */
+    @Override
+    public boolean hasNext() {
+      if (null == m_iter) {
+        return false;
+      }
+      boolean ret = m_iter.hasNext();
+      if (!ret) {
+        if (initNextPool()) {
+          ret = m_iter.hasNext();
+        }
+      }
+      return ret;
+    }
+
+    /**
+     * get next node
+     *
+     * @return the next node
+     */
+    @Override
+    public V next() {
+      if (null == m_iter) {
+        throw new NoSuchElementException();
+      }
+      return m_iter.next();
+    }
+
+    /**
+     * override remove()
+     */
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
   }
 
   @Override
   public void close() {
-    m_act.close();
+    if (null != m_act) {
+      m_act.close();
+    }
   }
 
   public String getServiceName() {

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
index 7289162..258ab72 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
@@ -46,8 +46,6 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
   protected DurableSinglyLinkedList<V> m_listnode;
   protected A m_act;
 
-  protected abstract void initNextPool();
-
   @Override
   public A getAllocator() {
     return m_act;
@@ -119,16 +117,17 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
       if (ret != null) {
         ((Durable) ret).destroy();
       }
-      initNextPool();
-      try { /* retry */
-        nv = createDurableNode();
-        ret = createDurableObjectRecord(size);
-      } catch (OutOfHybridMemory ee) {
-        if (nv != null) {
-          nv.destroy();
-        }
-        if (ret != null) {
-          ((Durable) ret).destroy();
+      if (initNextPool()) {
+        try { /* retry */
+          nv = createDurableNode();
+          ret = createDurableObjectRecord(size);
+        } catch (OutOfHybridMemory ee) {
+          if (nv != null) {
+            nv.destroy();
+          }
+          if (ret != null) {
+            ((Durable) ret).destroy();
+          }
         }
       }
     }
@@ -168,8 +167,9 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
       try {
         nv = createDurableNode();
       } catch (OutOfHybridMemory e) {
-        initNextPool();
-        nv = createDurableNode();
+        if (initNextPool()) {
+          nv = createDurableNode();
+        }
       }
       break;
     }
@@ -201,8 +201,10 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
 
   @Override
   public void close() {
-    destroyAllPendingRecords();
-    m_act.close();
+    if (null != m_act) {
+      destroyAllPendingRecords();
+      m_act.close();
+    }
   }
 
   public long getSlotKeyId() {

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
index e2cfde1..e5e03a5 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
@@ -21,7 +21,14 @@ package org.apache.mnemonic.sessions;
 import java.io.Closeable;
 import java.util.Iterator;
 
-public interface InputSession<V> extends Closeable {
+public interface InputSession<V> extends Closeable, Iterable<V> {
+
+  /**
+   * Initialize the next pool, must be called before use
+   *
+   * @return true if success
+   */
+  boolean initNextPool();
 
   Iterator<V> iterator();
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/640becdd/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
index 598becc..19d1eaa 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
@@ -22,6 +22,13 @@ import java.io.Closeable;
 
 public interface OutputSession<V> extends Closeable {
 
+  /**
+   * Initialize the next pool, must be called before use
+   *
+   * @return true if success
+   */
+  boolean initNextPool();
+
   V newDurableObjectRecord();
 
   V newDurableObjectRecord(long size);