You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/03/15 00:36:51 UTC
incubator-mnemonic git commit: MNEMONIC-218: Make inputSession &
outputSession to accept the Configration parameter;
MNEMONIC-219: Remove the dependency on mapred FileOutputFormat.getUniqueFile
Repository: incubator-mnemonic
Updated Branches:
refs/heads/master c165b1d6e -> 1705665a5
MNEMONIC-218: Make inputSession & outputSession to accept the Configration parameter;
MNEMONIC-219: Remove the dependency on mapred FileOutputFormat.getUniqueFile
Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/1705665a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/1705665a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/1705665a
Branch: refs/heads/master
Commit: 1705665a5ef6ee8b74c8e34926d79e6040207738
Parents: c165b1d
Author: paley <pa...@gmail.com>
Authored: Tue Mar 14 15:19:04 2017 -0700
Committer: paley <pa...@gmail.com>
Committed: Tue Mar 14 17:24:09 2017 -0700
----------------------------------------------------------------------
.../mnemonic/hadoop/MneDurableInputSession.java | 20 ++++++--
.../hadoop/MneDurableOutputSession.java | 53 ++++++++++++++++++--
2 files changed, 67 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1705665a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
index c473229..2d7f4c3 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -35,6 +35,7 @@ public class MneDurableInputSession<V>
implements MneInputSession<V>, MneDurableComputable<NonVolatileMemAllocator> {
private TaskAttemptContext taskAttemptContext;
+ private Configuration configuration;
private String serviceName;
private DurableType[] durableTypes;
private EntityFactoryProxy[] entityFactoryProxies;
@@ -46,6 +47,11 @@ public class MneDurableInputSession<V>
public MneDurableInputSession(TaskAttemptContext taskAttemptContext) {
setTaskAttemptContext(taskAttemptContext);
+ setConfiguration(taskAttemptContext.getConfiguration());
+ }
+
+ public MneDurableInputSession(Configuration configuration) {
+ setConfiguration(configuration);
}
public void validateConfig() {
@@ -61,10 +67,10 @@ public class MneDurableInputSession<V>
@Override
public void readConfig(String prefix) {
- if (getTaskAttemptContext() == null) {
- throw new ConfigurationException("taskAttemptContext has not yet been set");
+ if (getConfiguration() == null) {
+ throw new ConfigurationException("configuration has not yet been set");
}
- Configuration conf = getTaskAttemptContext().getConfiguration();
+ Configuration conf = getConfiguration();
setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
@@ -144,4 +150,12 @@ public class MneDurableInputSession<V>
public long getHandler() {
return m_handler;
}
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(Configuration configuration) {
+ this.configuration = configuration;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1705665a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
index ad09c43..de26b14 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
@@ -28,6 +28,7 @@ import org.apache.mnemonic.Utils;
import org.apache.mnemonic.collections.DurableSinglyLinkedList;
import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
+import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -35,7 +36,9 @@ import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MneDurableOutputSession<V>
@@ -43,6 +46,7 @@ public class MneDurableOutputSession<V>
private long poolSize;
private TaskAttemptContext taskAttemptContext;
+ private Configuration configuration;
private String serviceName;
private DurableType[] durableTypes;
private EntityFactoryProxy[] entityFactoryProxies;
@@ -61,6 +65,11 @@ public class MneDurableOutputSession<V>
public MneDurableOutputSession(TaskAttemptContext taskAttemptContext) {
setTaskAttemptContext(taskAttemptContext);
m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
+ setConfiguration(taskAttemptContext.getConfiguration());
+ }
+
+ public MneDurableOutputSession(Configuration configuration) {
+ setConfiguration(configuration);
}
public void validateConfig() {
@@ -79,7 +88,7 @@ public class MneDurableOutputSession<V>
if (getTaskAttemptContext() == null) {
throw new ConfigurationException("taskAttemptContext has not yet been set");
}
- Configuration conf = getTaskAttemptContext().getConfiguration();
+ Configuration conf = getConfiguration();
setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
@@ -93,10 +102,40 @@ public class MneDurableOutputSession<V>
protected Path genNextPoolPath() {
Path ret = new Path(FileOutputFormat.getOutputPath(getTaskAttemptContext()),
- FileOutputFormat.getUniqueFile(getTaskAttemptContext(),
- String.format("%s-%05d", getBaseOutputName(), ++m_poolidx), MneConfigHelper.DEFAULT_FILE_EXTENSION));
+ getUniqueName(String.format("%s-%05d", getBaseOutputName(), ++m_poolidx),
+ MneConfigHelper.DEFAULT_FILE_EXTENSION));
return ret;
}
+
+ protected String getUniqueName(String name, String extension) {
+ int partition;
+
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setMinimumIntegerDigits(5);
+ numberFormat.setGroupingUsed(false);
+
+ if (null != getTaskAttemptContext()) {
+ TaskID taskId = getTaskAttemptContext().getTaskAttemptID().getTaskID();
+ partition = taskId.getId();
+ } else {
+ partition = getConfiguration().getInt(JobContext.TASK_PARTITION, -1);
+ }
+ if (partition == -1) {
+ throw new IllegalArgumentException("This method can only be called from an application");
+ }
+
+ String taskType = getConfiguration().getBoolean(JobContext.TASK_ISMAP, JobContext.DEFAULT_TASK_ISMAP) ? "m" : "r";
+
+ StringBuilder result = new StringBuilder();
+ result.append(name);
+ result.append('-');
+ result.append(taskType);
+ result.append('-');
+ result.append(numberFormat.format(partition));
+ result.append(extension);
+ return result.toString();
+
+ }
@Override
public void initNextPool() {
@@ -326,4 +365,12 @@ public class MneDurableOutputSession<V>
this.baseOutputName = baseOutputName;
}
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
}