You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/03/15 20:17:39 UTC
incubator-mnemonic git commit: MNEMONIC-216: Support Hadoop mapred
APIs MNEMONIC-220: Remove the dependency on OutputDir of FileOutputFormat
Repository: incubator-mnemonic
Updated Branches:
refs/heads/master 1705665a5 -> 224ef2fec
MNEMONIC-216: Support Hadoop mapred APIs
MNEMONIC-220: Remove the dependency on OutputDir of FileOutputFormat
Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/224ef2fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/224ef2fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/224ef2fe
Branch: refs/heads/master
Commit: 224ef2feceaa0da78b38f306f94f73b7d9a4d53e
Parents: 1705665
Author: paley <pa...@gmail.com>
Authored: Wed Mar 15 09:46:48 2017 -0700
Committer: paley <pa...@gmail.com>
Committed: Wed Mar 15 12:57:38 2017 -0700
----------------------------------------------------------------------
build-tools/test.conf | 3 +
.../apache/mnemonic/hadoop/MneConfigHelper.java | 9 +
.../mnemonic/hadoop/MneDurableInputSession.java | 8 +-
.../hadoop/MneDurableOutputSession.java | 20 +-
.../mnemonic/hadoop/mapred/MneInputFormat.java | 50 +++++
.../hadoop/mapred/MneMapredRecordReader.java | 90 ++++++++
.../hadoop/mapred/MneMapredRecordWriter.java | 49 +++++
.../mnemonic/hadoop/mapred/MneOutputFormat.java | 45 ++++
.../mapred/MneMapredBufferDataTest.java | 206 +++++++++++++++++++
.../mapreduce/MneMapreduceBufferDataTest.java | 2 +-
.../mapreduce/MneMapreduceChunkDataTest.java | 2 +-
.../mapreduce/MneMapreduceLongDataTest.java | 2 +-
.../mapreduce/MneMapreducePersonDataTest.java | 2 +-
13 files changed, 472 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/build-tools/test.conf
----------------------------------------------------------------------
diff --git a/build-tools/test.conf b/build-tools/test.conf
index cbaa909..e46fb9a 100644
--- a/build-tools/test.conf
+++ b/build-tools/test.conf
@@ -50,3 +50,6 @@ mvn -Dtest=MneMapreduceLongDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-map
mvn -Dtest=MneMapreduceBufferDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false
mvn -Dtest=MneMapreduceChunkDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false
+
+# a testcase for module "mnemonic-hadoop/mnemonic-hadoop-mapreduce" that requires 'pmalloc' memory service to pass
+mvn -Dtest=MneMapredBufferDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
index b8974a4..e2b10fb 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
@@ -44,6 +44,7 @@ public class MneConfigHelper {
public static final String DEFAULT_NAME_PART = "part";
public static final String DEFAULT_FILE_EXTENSION = ".mne";
public static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
+ public static final String DIR = "dir";
private static final Logger LOGGER = LoggerFactory.getLogger(MneConfigHelper.class);
@@ -129,5 +130,13 @@ public class MneConfigHelper {
public static long getMemPoolSize(Configuration conf, String prefix) {
return conf.getLong(getConfigName(prefix, MEM_POOL_SIZE), DEFAULT_OUTPUT_MEM_POOL_SIZE);
}
+
+ public static String getDir(Configuration conf, String prefix) {
+ return conf.get(getConfigName(prefix, DIR));
+ }
+
+ public static void setDir(Configuration conf, String prefix, String dirname) {
+ conf.set(getConfigName(prefix, DIR), dirname);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
index 2d7f4c3..469dc7a 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -46,8 +46,8 @@ public class MneDurableInputSession<V>
public MneDurableInputSession(TaskAttemptContext taskAttemptContext) {
+ this(taskAttemptContext.getConfiguration());
setTaskAttemptContext(taskAttemptContext);
- setConfiguration(taskAttemptContext.getConfiguration());
}
public MneDurableInputSession(Configuration configuration) {
@@ -67,10 +67,10 @@ public class MneDurableInputSession<V>
@Override
public void readConfig(String prefix) {
- if (getConfiguration() == null) {
- throw new ConfigurationException("configuration has not yet been set");
- }
Configuration conf = getConfiguration();
+ if (conf == null) {
+ throw new ConfigurationException("Configuration has not yet been set");
+ }
setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
index de26b14..71bc861 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MneDurableOutputSession<V>
implements MneOutputSession<V>, MneDurableComputable<NonVolatileMemAllocator> {
@@ -63,13 +62,13 @@ public class MneDurableOutputSession<V>
protected Iterator<V> m_iter;
public MneDurableOutputSession(TaskAttemptContext taskAttemptContext) {
+ this(taskAttemptContext.getConfiguration());
setTaskAttemptContext(taskAttemptContext);
- m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
- setConfiguration(taskAttemptContext.getConfiguration());
}
public MneDurableOutputSession(Configuration configuration) {
setConfiguration(configuration);
+ m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
}
public void validateConfig() {
@@ -85,10 +84,10 @@ public class MneDurableOutputSession<V>
@Override
public void readConfig(String prefix) {
- if (getTaskAttemptContext() == null) {
- throw new ConfigurationException("taskAttemptContext has not yet been set");
- }
Configuration conf = getConfiguration();
+ if (conf == null) {
+ throw new ConfigurationException("Configuration has not yet been set");
+ }
setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
@@ -101,12 +100,17 @@ public class MneDurableOutputSession<V>
}
protected Path genNextPoolPath() {
- Path ret = new Path(FileOutputFormat.getOutputPath(getTaskAttemptContext()),
+ Path ret = new Path(getOutputDir(),
getUniqueName(String.format("%s-%05d", getBaseOutputName(), ++m_poolidx),
MneConfigHelper.DEFAULT_FILE_EXTENSION));
return ret;
}
-
+
+ protected Path getOutputDir() {
+ String name = MneConfigHelper.getDir(getConfiguration(), MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+ return name == null ? null : new Path(name);
+ }
+
protected String getUniqueName(String name, String extension) {
int partition;
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneInputFormat.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneInputFormat.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneInputFormat.java
new file mode 100644
index 0000000..61ed385
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneInputFormat.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
+
+/**
+ * A Mnemonic input format that satisfies the org.apache.hadoop.mapred API.
+ */
+
+public class MneInputFormat<MV extends MneDurableInputValue<V>, V>
+ extends FileInputFormat<NullWritable, MV> {
+
+ @Override
+ public RecordReader<NullWritable, MV>
+ getRecordReader(InputSplit inputSpilt,
+ JobConf jobConf,
+ Reporter reporter
+ ) throws IOException {
+ MneMapredRecordReader<MV, V> reader =
+ new MneMapredRecordReader<MV, V>((FileSplit) inputSpilt, jobConf);
+ return reader;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
new file mode 100644
index 0000000..54db99f
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableInputSession;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
+
+/**
+ * This record reader implements the org.apache.hadoop.mapred API.
+ *
+ * @param <V>
+ * the type of the data item
+ */
+
+public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
+ implements org.apache.hadoop.mapred.RecordReader<NullWritable, MV> {
+
+ protected Iterator<V> m_iter;
+ protected MneDurableInputSession<V> m_session;
+ protected FileSplit m_fileSplit;
+
+
+ public MneMapredRecordReader(FileSplit fileSplit, JobConf conf) throws IOException {
+ m_fileSplit = fileSplit;
+ m_session = new MneDurableInputSession<V>(conf);
+ m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+ m_session.initialize(m_fileSplit.getPath());
+ m_iter = m_session.iterator();
+ }
+
+ @Override
+ public boolean next(NullWritable key, MV value) throws IOException {
+ boolean ret = false;
+ if (m_iter.hasNext()) {
+ value.of(m_iter.next());
+ ret = true;
+ }
+ return ret;
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public MV createValue() {
+ return (MV) new MneDurableInputValue<V>(m_session);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return m_fileSplit.getLength();
+ }
+
+ @Override
+ public void close() throws IOException {
+ m_session.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return 0.5f; /* TBD */
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordWriter.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordWriter.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordWriter.java
new file mode 100644
index 0000000..cfd27f9
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordWriter.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
+
+/**
+ * This record writer implements the org.apache.hadoop.mapred API.
+ *
+ * @param <?>
+ * the type of the data item
+ */
+
+public class MneMapredRecordWriter<MV extends MneDurableOutputValue<?>>
+ implements RecordWriter<NullWritable, MV> {
+
+
+ @Override
+ public void write(NullWritable key, MV value) throws IOException {
+ value.post();
+ }
+
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneOutputFormat.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneOutputFormat.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneOutputFormat.java
new file mode 100644
index 0000000..be0b997
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneOutputFormat.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
+
+/**
+ * A Mnemonic output format that satisfies the org.apache.hadoop.mapred API.
+ */
+
+public class MneOutputFormat<MV extends MneDurableOutputValue<?>> extends FileOutputFormat<NullWritable, MV> {
+
+ @Override
+ public RecordWriter<NullWritable, MV> getRecordWriter(FileSystem ignored, JobConf job, String name,
+ Progressable progress) throws IOException {
+ return new MneMapredRecordWriter<MV>();
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredBufferDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredBufferDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredBufferDataTest.java
new file mode 100644
index 0000000..c178ee4
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapred/MneMapredBufferDataTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.mnemonic.DurableBuffer;
+import org.apache.mnemonic.DurableType;
+import org.apache.mnemonic.Utils;
+import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
+import org.apache.mnemonic.hadoop.MneDurableOutputSession;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
+import org.apache.mnemonic.hadoop.mapred.MneInputFormat;
+import org.apache.mnemonic.hadoop.mapred.MneOutputFormat;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MneMapredBufferDataTest {
+
+ private static final String DEFAULT_BASE_WORK_DIR = "target" + File.separator + "test" + File.separator + "tmp";
+ private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator + "mapred-buffer-data";
+ private static final String SERVICE_NAME = "pmalloc";
+ private static final long SLOT_KEY_ID = 5L;
+ private static final int TASK_PARTITION = 0;
+
+ private Path m_workdir;
+ private JobConf m_conf;
+ private FileSystem m_fs;
+ private Random m_rand;
+ private long m_reccnt = 5000L;
+ private volatile long m_checksum;
+ private volatile long m_totalsize = 0L;
+ private List<String> m_partfns;
+
+ @BeforeClass
+ public void setUp() throws IOException {
+ m_workdir = new Path(
+ System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
+ m_conf = new JobConf();
+ m_rand = Utils.createRandom();
+ m_partfns = new ArrayList<String>();
+
+ try {
+ m_fs = FileSystem.getLocal(m_conf).getRaw();
+ m_fs.delete(m_workdir, true);
+ m_fs.mkdirs(m_workdir);
+ } catch (IOException e) {
+ throw new IllegalStateException("bad fs init", e);
+ }
+
+ m_conf.setInt(JobContext.TASK_PARTITION, TASK_PARTITION);
+
+ MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
+ MneConfigHelper.setBaseOutputName(m_conf, null, "mapred-buffer-data");
+
+ MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
+ MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+ MneConfigHelper.setDurableTypes(m_conf,
+ MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[] {DurableType.BUFFER});
+ MneConfigHelper.setEntityFactoryProxies(m_conf,
+ MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[] {});
+ MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
+ MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+ MneConfigHelper.setMemPoolSize(m_conf,
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
+ MneConfigHelper.setDurableTypes(m_conf,
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[] {DurableType.BUFFER});
+ MneConfigHelper.setEntityFactoryProxies(m_conf,
+ MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[] {});
+ }
+
+ @AfterClass
+ public void tearDown() {
+
+ }
+
+ protected DurableBuffer<?> genupdDurableBuffer(
+ MneDurableOutputSession<DurableBuffer<?>> s, Checksum cs) {
+ DurableBuffer<?> ret = null;
+ int sz = m_rand.nextInt(1024 * 1024) + 1024 * 1024;
+ ret = s.newDurableObjectRecord(sz);
+ if (null != ret) {
+ ret.get().clear();
+ byte[] rdbytes = RandomUtils.nextBytes(sz);
+ Assert.assertNotNull(rdbytes);
+ ret.get().put(rdbytes);
+ cs.update(rdbytes, 0, rdbytes.length);
+ m_totalsize += sz;
+ }
+ return ret;
+ }
+
+ @Test(enabled = true)
+ public void testWriteBufferData() throws Exception {
+ NullWritable nada = NullWritable.get();
+ MneDurableOutputSession<DurableBuffer<?>> sess = new MneDurableOutputSession<DurableBuffer<?>>(m_conf);
+ sess.readConfig(MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+ sess.initNextPool();
+ MneDurableOutputValue<DurableBuffer<?>> mdvalue =
+ new MneDurableOutputValue<DurableBuffer<?>>(sess);
+ OutputFormat<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> outputFormat =
+ new MneOutputFormat<MneDurableOutputValue<DurableBuffer<?>>>();
+ RecordWriter<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> writer =
+ outputFormat.getRecordWriter(m_fs, m_conf, null, null);
+ DurableBuffer<?> dbuf = null;
+ Checksum cs = new CRC32();
+ cs.reset();
+ for (int i = 0; i < m_reccnt; ++i) {
+ dbuf = genupdDurableBuffer(sess, cs);
+ Assert.assertNotNull(dbuf);
+ writer.write(nada, mdvalue.of(dbuf));
+ }
+ m_checksum = cs.getValue();
+ writer.close(null);
+ sess.close();
+ }
+
+ @Test(enabled = true, dependsOnMethods = { "testWriteBufferData" })
+ public void testReadBufferData() throws Exception {
+ long reccnt = 0L;
+ long tsize = 0L;
+ byte[] buf;
+ Checksum cs = new CRC32();
+ cs.reset();
+ File folder = new File(m_workdir.toString());
+ File[] listfiles = folder.listFiles();
+ for (int idx = 0; idx < listfiles.length; ++idx) {
+ if (listfiles[idx].isFile()
+ && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
+ && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
+ m_partfns.add(listfiles[idx].getName());
+ }
+ }
+ Collections.sort(m_partfns); // keep the order for checksum
+ for (int idx = 0; idx < m_partfns.size(); ++idx) {
+ System.out.println(String.format("Verifying : %s", m_partfns.get(idx)));
+ FileSplit split = new FileSplit(
+ new Path(m_workdir, m_partfns.get(idx)), 0, 0L, new String[0]);
+ InputFormat<NullWritable, MneDurableInputValue<DurableBuffer<?>>> inputFormat =
+ new MneInputFormat<MneDurableInputValue<DurableBuffer<?>>, DurableBuffer<?>>();
+ RecordReader<NullWritable, MneDurableInputValue<DurableBuffer<?>>> reader =
+ inputFormat.getRecordReader((InputSplit) split, m_conf, null);
+ NullWritable dbufkey = reader.createKey();
+ MneDurableInputValue<DurableBuffer<?>> dbufval = null;
+ while (true) {
+ dbufval = reader.createValue();
+ if (reader.next(dbufkey, dbufval)) {
+ assert dbufval.getValue().getSize() == dbufval.getValue().get().capacity();
+ dbufval.getValue().get().clear();
+ buf = new byte[dbufval.getValue().get().capacity()];
+ dbufval.getValue().get().get(buf);
+ cs.update(buf, 0, buf.length);
+ tsize += dbufval.getValue().getSize();
+ ++reccnt;
+ } else {
+ break;
+ }
+ }
+ reader.close();
+ }
+ AssertJUnit.assertEquals(m_reccnt, reccnt);
+ AssertJUnit.assertEquals(m_totalsize, tsize);
+ AssertJUnit.assertEquals(m_checksum, cs.getValue());
+ System.out.println(String.format("The checksum of buffer is %d", m_checksum));
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
index e29a8c8..7b65661 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
@@ -92,7 +92,7 @@ public class MneMapreduceBufferDataTest {
m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
- m_conf.set("mapreduce.output.fileoutputformat.outputdir", m_workdir.toString());
+ MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
MneConfigHelper.setBaseOutputName(m_conf, null, "buffer-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
index 1dfe7ad..4a007d2 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
@@ -96,7 +96,7 @@ public class MneMapreduceChunkDataTest {
m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
- m_conf.set("mapreduce.output.fileoutputformat.outputdir", m_workdir.toString());
+ MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
MneConfigHelper.setBaseOutputName(m_conf, null, "chunk-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceLongDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceLongDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceLongDataTest.java
index bc6c2de..18a9e81 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceLongDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceLongDataTest.java
@@ -81,7 +81,7 @@ public class MneMapreduceLongDataTest {
m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
- m_conf.set("mapreduce.output.fileoutputformat.outputdir", m_workdir.toString());
+ MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
MneConfigHelper.setBaseOutputName(m_conf, null, "long-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/224ef2fe/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
index 635ed38..5d74838 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java
@@ -81,7 +81,7 @@ public class MneMapreducePersonDataTest {
m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
- m_conf.set("mapreduce.output.fileoutputformat.outputdir", m_workdir.toString());
+ MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
MneConfigHelper.setBaseOutputName(m_conf, null, "person-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);