You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/03/23 00:29:56 UTC

incubator-mnemonic git commit: MNEMONIC-230: Make it possible to return the iterator of DurableInputSession multiple times.

Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master e912470d0 -> 341156c9e


MNEMONIC-230: Make it possible to return the iterator of DurableInputSession multiple times.


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/341156c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/341156c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/341156c9

Branch: refs/heads/master
Commit: 341156c9ef5e55b7ef45d2509e1209147034ad5b
Parents: e912470
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Wed Mar 22 17:21:05 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Wed Mar 22 17:25:35 2017 -0700

----------------------------------------------------------------------
 .../org/apache/mnemonic/CloseableIterator.java  | 25 +++++++
 .../mnemonic/hadoop/MneDurableInputSession.java | 25 +++----
 .../hadoop/mapred/MneMapredRecordReader.java    |  9 +--
 .../mapreduce/MneMapreduceRecordReader.java     |  9 +--
 .../mnemonic/sessions/DurableInputSession.java  | 76 ++++++++++++++------
 .../apache/mnemonic/sessions/InputSession.java  | 10 +--
 .../mnemonic/sessions/SessionIterator.java      | 36 ++++++++++
 7 files changed, 138 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-core/src/main/java/org/apache/mnemonic/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/mnemonic-core/src/main/java/org/apache/mnemonic/CloseableIterator.java b/mnemonic-core/src/main/java/org/apache/mnemonic/CloseableIterator.java
new file mode 100644
index 0000000..69cf253
--- /dev/null
+++ b/mnemonic-core/src/main/java/org/apache/mnemonic/CloseableIterator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+public interface CloseableIterator<E> extends Iterator<E>, Closeable {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
index dc197ba..db616cf 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -35,6 +35,7 @@ import org.apache.mnemonic.Utils;
 import org.apache.mnemonic.collections.DurableSinglyLinkedList;
 import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
 import org.apache.mnemonic.sessions.DurableInputSession;
+import org.apache.mnemonic.sessions.SessionIterator;
 
 public class MneDurableInputSession<V>
     extends DurableInputSession<V, NonVolatileMemAllocator> {
@@ -83,23 +84,23 @@ public class MneDurableInputSession<V>
   }
 
   @Override
-  public boolean initNextPool() {
+  protected boolean initNextPool(SessionIterator<V, NonVolatileMemAllocator> sessiter) {
     boolean ret = false;
-    if (m_act != null) {
-      m_act.close();
-      m_act = null;
+    if (sessiter.getAllocator() != null) {
+      sessiter.getAllocator().close();
+      sessiter.setAllocator(null);
     }
     if (null != m_fp_iter && m_fp_iter.hasNext()) {
-      m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), 1024000L,
-          m_fp_iter.next(), true);
-      if (null != m_act) {
-        m_handler = m_act.getHandler(getSlotKeyId());
-        if (0L != m_handler) {
+      sessiter.setAllocator(new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(
+          getServiceName()), 1024000L, m_fp_iter.next(), true));
+      if (null != sessiter.getAllocator()) {
+        sessiter.setHandler(sessiter.getAllocator().getHandler(getSlotKeyId()));
+        if (0L != sessiter.getHandler()) {
           DurableSinglyLinkedList<V> dsllist = DurableSinglyLinkedListFactory.restore(
-              m_act, getEntityFactoryProxies(), getDurableTypes(), m_handler, false);
+              sessiter.getAllocator(), getEntityFactoryProxies(), getDurableTypes(), sessiter.getHandler(), false);
           if (null != dsllist) {
-            m_iter = dsllist.iterator();
-            ret = null != m_iter;
+            sessiter.setIterator(dsllist.iterator());
+            ret = null != sessiter.getIterator();
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
index 4a63f57..f834a29 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
@@ -19,11 +19,11 @@
 package org.apache.mnemonic.hadoop.mapred;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.mnemonic.CloseableIterator;
 import org.apache.mnemonic.hadoop.MneConfigHelper;
 import org.apache.mnemonic.hadoop.MneDurableInputSession;
 import org.apache.mnemonic.hadoop.MneDurableInputValue;
@@ -38,7 +38,7 @@ import org.apache.mnemonic.hadoop.MneDurableInputValue;
 public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
     implements org.apache.hadoop.mapred.RecordReader<NullWritable, MV> {
     
-    protected Iterator<V> m_iter;
+    protected CloseableIterator<V> m_iter;
     protected MneDurableInputSession<V> m_session;
     protected FileSplit m_fileSplit;
 
@@ -47,7 +47,6 @@ public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
         m_fileSplit = fileSplit;
         m_session = new MneDurableInputSession<V>(conf, m_fileSplit.getPath());
         m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
-        m_session.initNextPool();
         m_iter = m_session.iterator();
     }
     
@@ -79,7 +78,9 @@ public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
 
     @Override
     public void close() throws IOException {
-        m_session.close();        
+      if (null != m_iter) {
+        m_iter.close();
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
index 45785dc..c91f573 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
@@ -18,12 +18,12 @@
 package org.apache.mnemonic.hadoop.mapreduce;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.mnemonic.CloseableIterator;
 import org.apache.mnemonic.hadoop.MneConfigHelper;
 import org.apache.mnemonic.hadoop.MneDurableInputSession;
 import org.apache.mnemonic.hadoop.MneDurableInputValue;
@@ -37,12 +37,14 @@ import org.apache.mnemonic.hadoop.MneDurableInputValue;
 public class MneMapreduceRecordReader<MV extends MneDurableInputValue<V>, V>
     extends org.apache.hadoop.mapreduce.RecordReader<NullWritable, MV> {
 
-  protected Iterator<V> m_iter;
+  protected CloseableIterator<V> m_iter;
   protected MneDurableInputSession<V> m_session;
 
   @Override
   public void close() throws IOException {
-    m_session.close();
+    if (null != m_iter) {
+      m_iter.close();
+    }
   }
 
   @Override
@@ -50,7 +52,6 @@ public class MneMapreduceRecordReader<MV extends MneDurableInputValue<V>, V>
     FileSplit split = (FileSplit) inputSplit;
     m_session = new MneDurableInputSession<V>(context, split.getPath());
     m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
-    m_session.initNextPool();
     m_iter = m_session.iterator();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
index 00e8d79..ea87f1c 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
@@ -26,16 +26,19 @@ import org.apache.mnemonic.EntityFactoryProxy;
 import org.apache.mnemonic.RestorableAllocator;
 
 public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
-    implements InputSession<V>, DurableComputable<A> {
+    implements InputSession<V> {
 
   private String serviceName;
   private DurableType[] durableTypes;
   private EntityFactoryProxy[] entityFactoryProxies;
   private long slotKeyId;
 
-  protected long m_handler;
-  protected A m_act;
-  protected Iterator<V> m_iter;
+  /**
+   * Initialize the next pool, must be called before use
+   *
+   * @return true if success
+   */
+  protected abstract boolean initNextPool(SessionIterator<V, A> sessiter);
 
   /**
    * One session can only manage one iterator instance at a time for the simplicity
@@ -44,15 +47,21 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
    *
    */
   @Override
-  public Iterator<V> iterator() {
-    return new Intr();
+  public SessionIterator<V, A> iterator() {
+    SessionIterator<V, A> ret = new Intr();
+    initNextPool(ret);
+    return ret;
   }
 
   /**
    * this class defines a iterator for multiple pools read
    *
    */
-  private class Intr implements Iterator<V> {
+  private class Intr implements SessionIterator<V, A> {
+
+    protected long m_handler;
+    protected A m_act;
+    protected Iterator<V> m_iter;
 
     /**
      * determine the existing of next
@@ -67,7 +76,7 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
       }
       boolean ret = m_iter.hasNext();
       if (!ret) {
-        if (initNextPool()) {
+        if (initNextPool(this)) {
           ret = m_iter.hasNext();
         }
       }
@@ -94,13 +103,44 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
     public void remove() {
       throw new UnsupportedOperationException();
     }
-  }
 
-  @Override
-  public void close() {
-    if (null != m_act) {
-      m_act.close();
+    @Override
+    public A getAllocator() {
+      return m_act;
     }
+
+    @Override
+    public long getHandler() {
+      return m_handler;
+    }
+
+    @Override
+    public void setAllocator(A alloc) {
+      m_act = alloc;
+    }
+
+    @Override
+    public void setHandler(long hdl) {
+      m_handler = hdl;
+    }
+
+    @Override
+    public void setIterator(Iterator<V> iter) {
+      m_iter = iter;
+    }
+
+    @Override
+    public void close() {
+      if (null != m_act) {
+        m_act.close();
+      }
+    }
+
+    @Override
+    public Iterator<V> getIterator() {
+      return m_iter;
+    }
+
   }
 
   public String getServiceName() {
@@ -135,14 +175,4 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
     this.slotKeyId = slotKeyId;
   }
 
-  @Override
-  public A getAllocator() {
-    return m_act;
-  }
-
-  @Override
-  public long getHandler() {
-    return m_handler;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
index e5e03a5..810ff5e 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
@@ -18,17 +18,9 @@
 
 package org.apache.mnemonic.sessions;
 
-import java.io.Closeable;
 import java.util.Iterator;
 
-public interface InputSession<V> extends Closeable, Iterable<V> {
-
-  /**
-   * Initialize the next pool, must be called before use
-   *
-   * @return true if success
-   */
-  boolean initNextPool();
+public interface InputSession<V> extends Iterable<V> {
 
   Iterator<V> iterator();
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/SessionIterator.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/SessionIterator.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/SessionIterator.java
new file mode 100644
index 0000000..c569633
--- /dev/null
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/SessionIterator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.sessions;
+
+import java.util.Iterator;
+
+import org.apache.mnemonic.CloseableIterator;
+import org.apache.mnemonic.RestorableAllocator;
+
+public interface SessionIterator<V, A extends RestorableAllocator<A>>
+    extends CloseableIterator<V>, DurableComputable<A> {
+
+  void setAllocator(A alloc);
+
+  void setHandler(long hdl);
+
+  void setIterator(Iterator<V> iter);
+
+  Iterator<V> getIterator();
+}