You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/03/15 00:36:51 UTC

incubator-mnemonic git commit: MNEMONIC-218: Make inputSession & outputSession to accept the Configration parameter; MNEMONIC-219: Remove the dependency on mapred FileOutputFormat.getUniqueFile

Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master c165b1d6e -> 1705665a5


MNEMONIC-218: Make inputSession & outputSession to accept the Configration parameter;
MNEMONIC-219: Remove the dependency on mapred FileOutputFormat.getUniqueFile


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/1705665a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/1705665a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/1705665a

Branch: refs/heads/master
Commit: 1705665a5ef6ee8b74c8e34926d79e6040207738
Parents: c165b1d
Author: paley <pa...@gmail.com>
Authored: Tue Mar 14 15:19:04 2017 -0700
Committer: paley <pa...@gmail.com>
Committed: Tue Mar 14 17:24:09 2017 -0700

----------------------------------------------------------------------
 .../mnemonic/hadoop/MneDurableInputSession.java | 20 ++++++--
 .../hadoop/MneDurableOutputSession.java         | 53 ++++++++++++++++++--
 2 files changed, 67 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1705665a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
index c473229..2d7f4c3 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -35,6 +35,7 @@ public class MneDurableInputSession<V>
     implements MneInputSession<V>, MneDurableComputable<NonVolatileMemAllocator> {
 
   private TaskAttemptContext taskAttemptContext;
+  private Configuration configuration;
   private String serviceName;
   private DurableType[] durableTypes;
   private EntityFactoryProxy[] entityFactoryProxies;
@@ -46,6 +47,11 @@ public class MneDurableInputSession<V>
 
   public MneDurableInputSession(TaskAttemptContext taskAttemptContext) {
     setTaskAttemptContext(taskAttemptContext);
+    setConfiguration(taskAttemptContext.getConfiguration());
+  }
+
+  public MneDurableInputSession(Configuration configuration) {
+    setConfiguration(configuration);
   }
 
   public void validateConfig() {
@@ -61,10 +67,10 @@ public class MneDurableInputSession<V>
 
   @Override
   public void readConfig(String prefix) {
-    if (getTaskAttemptContext() == null) {
-      throw new ConfigurationException("taskAttemptContext has not yet been set");
+    if (getConfiguration() == null) {
+      throw new ConfigurationException("configuration has not yet been set");
     }
-    Configuration conf = getTaskAttemptContext().getConfiguration();
+    Configuration conf = getConfiguration();
     setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
     setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
     setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
@@ -144,4 +150,12 @@ public class MneDurableInputSession<V>
   public long getHandler() {
     return m_handler;
   }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  public void setConfiguration(Configuration configuration) {
+    this.configuration = configuration;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1705665a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
index ad09c43..de26b14 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
@@ -28,6 +28,7 @@ import org.apache.mnemonic.Utils;
 import org.apache.mnemonic.collections.DurableSinglyLinkedList;
 import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
 
+import java.text.NumberFormat;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -35,7 +36,9 @@ import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public class MneDurableOutputSession<V>
@@ -43,6 +46,7 @@ public class MneDurableOutputSession<V>
 
   private long poolSize;
   private TaskAttemptContext taskAttemptContext;
+  private Configuration configuration;
   private String serviceName;
   private DurableType[] durableTypes;
   private EntityFactoryProxy[] entityFactoryProxies;
@@ -61,6 +65,11 @@ public class MneDurableOutputSession<V>
   public MneDurableOutputSession(TaskAttemptContext taskAttemptContext) {
     setTaskAttemptContext(taskAttemptContext);
     m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
+    setConfiguration(taskAttemptContext.getConfiguration());
+  }
+  
+  public MneDurableOutputSession(Configuration configuration) {
+    setConfiguration(configuration);
   }
 
   public void validateConfig() {
@@ -79,7 +88,7 @@ public class MneDurableOutputSession<V>
     if (getTaskAttemptContext() == null) {
       throw new ConfigurationException("taskAttemptContext has not yet been set");
     }
-    Configuration conf = getTaskAttemptContext().getConfiguration();
+    Configuration conf = getConfiguration();
     setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
     setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
     setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
@@ -93,10 +102,40 @@ public class MneDurableOutputSession<V>
 
   protected Path genNextPoolPath() {
     Path ret = new Path(FileOutputFormat.getOutputPath(getTaskAttemptContext()),
-        FileOutputFormat.getUniqueFile(getTaskAttemptContext(),
-            String.format("%s-%05d", getBaseOutputName(), ++m_poolidx), MneConfigHelper.DEFAULT_FILE_EXTENSION));
+        getUniqueName(String.format("%s-%05d", getBaseOutputName(), ++m_poolidx), 
+            MneConfigHelper.DEFAULT_FILE_EXTENSION));
     return ret;
   }
+  
+  protected String getUniqueName(String name, String extension) {
+    int partition;
+    
+    NumberFormat numberFormat = NumberFormat.getInstance();
+    numberFormat.setMinimumIntegerDigits(5);
+    numberFormat.setGroupingUsed(false);
+    
+    if (null != getTaskAttemptContext()) {
+      TaskID taskId = getTaskAttemptContext().getTaskAttemptID().getTaskID();
+      partition = taskId.getId();
+    } else {
+      partition = getConfiguration().getInt(JobContext.TASK_PARTITION, -1);
+    } 
+    if (partition == -1) {
+      throw new IllegalArgumentException("This method can only be called from an application");
+    }
+    
+    String taskType = getConfiguration().getBoolean(JobContext.TASK_ISMAP, JobContext.DEFAULT_TASK_ISMAP) ? "m" : "r";
+    
+    StringBuilder result = new StringBuilder();
+    result.append(name);
+    result.append('-');
+    result.append(taskType);
+    result.append('-');
+    result.append(numberFormat.format(partition));
+    result.append(extension);
+    return result.toString();
+    
+  }
 
   @Override
   public void initNextPool() {
@@ -326,4 +365,12 @@ public class MneDurableOutputSession<V>
     this.baseOutputName = baseOutputName;
   }
 
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  public void setConfiguration(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
 }