You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/03/23 00:29:56 UTC
incubator-mnemonic git commit: MNEMONIC-230: Make it possible to
return the iterator of DurableInputSession multiple times.
Repository: incubator-mnemonic
Updated Branches:
refs/heads/master e912470d0 -> 341156c9e
MNEMONIC-230: Make it possible to return the iterator of DurableInputSession multiple times.
Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/341156c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/341156c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/341156c9
Branch: refs/heads/master
Commit: 341156c9ef5e55b7ef45d2509e1209147034ad5b
Parents: e912470
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Wed Mar 22 17:21:05 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Wed Mar 22 17:25:35 2017 -0700
----------------------------------------------------------------------
.../org/apache/mnemonic/CloseableIterator.java | 25 +++++++
.../mnemonic/hadoop/MneDurableInputSession.java | 25 +++----
.../hadoop/mapred/MneMapredRecordReader.java | 9 +--
.../mapreduce/MneMapreduceRecordReader.java | 9 +--
.../mnemonic/sessions/DurableInputSession.java | 76 ++++++++++++++------
.../apache/mnemonic/sessions/InputSession.java | 10 +--
.../mnemonic/sessions/SessionIterator.java | 36 ++++++++++
7 files changed, 138 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-core/src/main/java/org/apache/mnemonic/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/mnemonic-core/src/main/java/org/apache/mnemonic/CloseableIterator.java b/mnemonic-core/src/main/java/org/apache/mnemonic/CloseableIterator.java
new file mode 100644
index 0000000..69cf253
--- /dev/null
+++ b/mnemonic-core/src/main/java/org/apache/mnemonic/CloseableIterator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+public interface CloseableIterator<E> extends Iterator<E>, Closeable {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
index dc197ba..db616cf 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -35,6 +35,7 @@ import org.apache.mnemonic.Utils;
import org.apache.mnemonic.collections.DurableSinglyLinkedList;
import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
import org.apache.mnemonic.sessions.DurableInputSession;
+import org.apache.mnemonic.sessions.SessionIterator;
public class MneDurableInputSession<V>
extends DurableInputSession<V, NonVolatileMemAllocator> {
@@ -83,23 +84,23 @@ public class MneDurableInputSession<V>
}
@Override
- public boolean initNextPool() {
+ protected boolean initNextPool(SessionIterator<V, NonVolatileMemAllocator> sessiter) {
boolean ret = false;
- if (m_act != null) {
- m_act.close();
- m_act = null;
+ if (sessiter.getAllocator() != null) {
+ sessiter.getAllocator().close();
+ sessiter.setAllocator(null);
}
if (null != m_fp_iter && m_fp_iter.hasNext()) {
- m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), 1024000L,
- m_fp_iter.next(), true);
- if (null != m_act) {
- m_handler = m_act.getHandler(getSlotKeyId());
- if (0L != m_handler) {
+ sessiter.setAllocator(new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(
+ getServiceName()), 1024000L, m_fp_iter.next(), true));
+ if (null != sessiter.getAllocator()) {
+ sessiter.setHandler(sessiter.getAllocator().getHandler(getSlotKeyId()));
+ if (0L != sessiter.getHandler()) {
DurableSinglyLinkedList<V> dsllist = DurableSinglyLinkedListFactory.restore(
- m_act, getEntityFactoryProxies(), getDurableTypes(), m_handler, false);
+ sessiter.getAllocator(), getEntityFactoryProxies(), getDurableTypes(), sessiter.getHandler(), false);
if (null != dsllist) {
- m_iter = dsllist.iterator();
- ret = null != m_iter;
+ sessiter.setIterator(dsllist.iterator());
+ ret = null != sessiter.getIterator();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
index 4a63f57..f834a29 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
@@ -19,11 +19,11 @@
package org.apache.mnemonic.hadoop.mapred;
import java.io.IOException;
-import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.mnemonic.CloseableIterator;
import org.apache.mnemonic.hadoop.MneConfigHelper;
import org.apache.mnemonic.hadoop.MneDurableInputSession;
import org.apache.mnemonic.hadoop.MneDurableInputValue;
@@ -38,7 +38,7 @@ import org.apache.mnemonic.hadoop.MneDurableInputValue;
public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
implements org.apache.hadoop.mapred.RecordReader<NullWritable, MV> {
- protected Iterator<V> m_iter;
+ protected CloseableIterator<V> m_iter;
protected MneDurableInputSession<V> m_session;
protected FileSplit m_fileSplit;
@@ -47,7 +47,6 @@ public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
m_fileSplit = fileSplit;
m_session = new MneDurableInputSession<V>(conf, m_fileSplit.getPath());
m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
- m_session.initNextPool();
m_iter = m_session.iterator();
}
@@ -79,7 +78,9 @@ public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
@Override
public void close() throws IOException {
- m_session.close();
+ if (null != m_iter) {
+ m_iter.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
index 45785dc..c91f573 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
@@ -18,12 +18,12 @@
package org.apache.mnemonic.hadoop.mapreduce;
import java.io.IOException;
-import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.mnemonic.CloseableIterator;
import org.apache.mnemonic.hadoop.MneConfigHelper;
import org.apache.mnemonic.hadoop.MneDurableInputSession;
import org.apache.mnemonic.hadoop.MneDurableInputValue;
@@ -37,12 +37,14 @@ import org.apache.mnemonic.hadoop.MneDurableInputValue;
public class MneMapreduceRecordReader<MV extends MneDurableInputValue<V>, V>
extends org.apache.hadoop.mapreduce.RecordReader<NullWritable, MV> {
- protected Iterator<V> m_iter;
+ protected CloseableIterator<V> m_iter;
protected MneDurableInputSession<V> m_session;
@Override
public void close() throws IOException {
- m_session.close();
+ if (null != m_iter) {
+ m_iter.close();
+ }
}
@Override
@@ -50,7 +52,6 @@ public class MneMapreduceRecordReader<MV extends MneDurableInputValue<V>, V>
FileSplit split = (FileSplit) inputSplit;
m_session = new MneDurableInputSession<V>(context, split.getPath());
m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
- m_session.initNextPool();
m_iter = m_session.iterator();
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
index 00e8d79..ea87f1c 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java
@@ -26,16 +26,19 @@ import org.apache.mnemonic.EntityFactoryProxy;
import org.apache.mnemonic.RestorableAllocator;
public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
- implements InputSession<V>, DurableComputable<A> {
+ implements InputSession<V> {
private String serviceName;
private DurableType[] durableTypes;
private EntityFactoryProxy[] entityFactoryProxies;
private long slotKeyId;
- protected long m_handler;
- protected A m_act;
- protected Iterator<V> m_iter;
+ /**
+ * Initialize the next pool, must be called before use
+ *
+ * @return true if success
+ */
+ protected abstract boolean initNextPool(SessionIterator<V, A> sessiter);
/**
* One session can only manage one iterator instance at a time for the simplicity
@@ -44,15 +47,21 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
*
*/
@Override
- public Iterator<V> iterator() {
- return new Intr();
+ public SessionIterator<V, A> iterator() {
+ SessionIterator<V, A> ret = new Intr();
+ initNextPool(ret);
+ return ret;
}
/**
* this class defines a iterator for multiple pools read
*
*/
- private class Intr implements Iterator<V> {
+ private class Intr implements SessionIterator<V, A> {
+
+ protected long m_handler;
+ protected A m_act;
+ protected Iterator<V> m_iter;
/**
* determine the existing of next
@@ -67,7 +76,7 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
}
boolean ret = m_iter.hasNext();
if (!ret) {
- if (initNextPool()) {
+ if (initNextPool(this)) {
ret = m_iter.hasNext();
}
}
@@ -94,13 +103,44 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
public void remove() {
throw new UnsupportedOperationException();
}
- }
- @Override
- public void close() {
- if (null != m_act) {
- m_act.close();
+ @Override
+ public A getAllocator() {
+ return m_act;
}
+
+ @Override
+ public long getHandler() {
+ return m_handler;
+ }
+
+ @Override
+ public void setAllocator(A alloc) {
+ m_act = alloc;
+ }
+
+ @Override
+ public void setHandler(long hdl) {
+ m_handler = hdl;
+ }
+
+ @Override
+ public void setIterator(Iterator<V> iter) {
+ m_iter = iter;
+ }
+
+ @Override
+ public void close() {
+ if (null != m_act) {
+ m_act.close();
+ }
+ }
+
+ @Override
+ public Iterator<V> getIterator() {
+ return m_iter;
+ }
+
}
public String getServiceName() {
@@ -135,14 +175,4 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
this.slotKeyId = slotKeyId;
}
- @Override
- public A getAllocator() {
- return m_act;
- }
-
- @Override
- public long getHandler() {
- return m_handler;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
index e5e03a5..810ff5e 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java
@@ -18,17 +18,9 @@
package org.apache.mnemonic.sessions;
-import java.io.Closeable;
import java.util.Iterator;
-public interface InputSession<V> extends Closeable, Iterable<V> {
-
- /**
- * Initialize the next pool, must be called before use
- *
- * @return true if success
- */
- boolean initNextPool();
+public interface InputSession<V> extends Iterable<V> {
Iterator<V> iterator();
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/341156c9/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/SessionIterator.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/SessionIterator.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/SessionIterator.java
new file mode 100644
index 0000000..c569633
--- /dev/null
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/SessionIterator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.sessions;
+
+import java.util.Iterator;
+
+import org.apache.mnemonic.CloseableIterator;
+import org.apache.mnemonic.RestorableAllocator;
+
+public interface SessionIterator<V, A extends RestorableAllocator<A>>
+ extends CloseableIterator<V>, DurableComputable<A> {
+
+ void setAllocator(A alloc);
+
+ void setHandler(long hdl);
+
+ void setIterator(Iterator<V> iter);
+
+ Iterator<V> getIterator();
+}