You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/26 06:10:38 UTC
[2/2] git commit: TEZ-503. Fixes to get BroadcastInput /
BroadcastOutput working. (sseth)
TEZ-503. Fixes to get BroadcastInput / BroadcastOutput working. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3334ca14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3334ca14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3334ca14
Branch: refs/heads/master
Commit: 3334ca1484a1b79d5356638cdb42f139187347e9
Parents: 906be8f
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Sep 25 21:10:04 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Sep 25 21:10:04 2013 -0700
----------------------------------------------------------------------
.../main/java/org/apache/tez/dag/api/DAG.java | 5 -
.../runtime/api/events/DataMovementEvent.java | 5 +
.../tez/mapreduce/examples/ExampleDriver.java | 2 +
.../mapreduce/examples/FilterLinesByWord.java | 266 +++++++++++++++++++
.../processor/FilterByWordInputProcessor.java | 127 +++++++++
.../processor/FilterByWordOutputProcessor.java | 102 +++++++
.../runtime/LogicalIOProcessorRuntimeTask.java | 6 +-
.../runtime/api/impl/TezInputContextImpl.java | 6 +-
.../runtime/api/impl/TezOutputContextImpl.java | 6 +-
.../api/impl/TezProcessorContextImpl.java | 6 +-
.../runtime/api/impl/TezTaskContextImpl.java | 7 +-
tez-runtime-library/pom.xml | 5 +
.../broadcast/input/BroadcastInputManager.java | 21 +-
.../broadcast/input/BroadcastKVReader.java | 12 +-
.../BroadcastShuffleInputEventHandler.java | 5 +
.../input/BroadcastShuffleManager.java | 81 ++++--
.../broadcast/output/FileBasedKVWriter.java | 4 +
.../common/shuffle/impl/ShuffleScheduler.java | 4 +-
.../runtime/library/common/sort/impl/IFile.java | 5 +-
.../library/input/ShuffledUnorderedKVInput.java | 17 +-
.../library/output/OnFileUnorderedKVOutput.java | 11 +-
.../runtime/library/shuffle/common/Fetcher.java | 4 +-
.../library/shuffle/common/InputHost.java | 7 +
.../shuffle/common/MemoryFetchedInput.java | 2 +-
.../input/TestBroadcastInputManager.java | 84 ++++++
.../library/common/sort/impl/TestIFile.java | 205 ++++++++++++++
.../output/TestOnFileUnorderedKVOutput.java | 155 +++++++++++
.../runtime/library/testutils/KVDataGen.java | 64 +++++
28 files changed, 1161 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 9cb602c..f64dc18 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -171,11 +171,6 @@ public class DAG { // FIXME rename to Topology
if(restricted){
for(Edge e : edges){
vertexMap.get(e.getInputVertex().getVertexName()).outDegree++;
- if (e.getEdgeProperty().getDataMovementType() !=
- DataMovementType.SCATTER_GATHER) {
- throw new IllegalStateException(
- "Unsupported connection pattern on edge. " + e);
- }
if (e.getEdgeProperty().getDataSourceType() !=
DataSourceType.PERSISTED) {
throw new IllegalStateException(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
index b384676..e5f9b05 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
@@ -106,4 +106,9 @@ public final class DataMovementEvent extends Event {
this.version = version;
}
+ @Override
+ public String toString() {
+ return "DataMovementEvent [sourceIndex=" + sourceIndex + ", targetIndex="
+ + targetIndex + ", version=" + version + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 6c062a4..cb7abe1 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -74,6 +74,8 @@ public class ExampleDriver {
"MRR Sleep Job");
pgd.addClass("orderedwordcount", OrderedWordCount.class,
"Word Count with words sorted on frequency");
+ pgd.addClass("filterLinesByWord", FilterLinesByWord.class,
+ "Filters lines by the specified word");
exitCode = pgd.run(argv);
}
catch(Throwable e){
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
new file mode 100644
index 0000000..640d734
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ClassUtil;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.processor.FilterByWordInputProcessor;
+import org.apache.tez.processor.FilterByWordOutputProcessor;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+
+public class FilterLinesByWord {
+
+ private static Log LOG = LogFactory.getLog(FilterLinesByWord.class);
+
+ public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
+
+
+ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, TezException {
+ Configuration conf = new Configuration();
+ String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+
+ if (otherArgs.length != 3) {
+ System.err.println("Usage filtelinesrbyword <in> <out> <filter_word>");
+ System.exit(2);
+ }
+
+ String inputPath = otherArgs[0];
+ String outputPath = otherArgs[1];
+ String filterWord = otherArgs[2];
+
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(new Path(outputPath))) {
+ System.err.println("Output directory : " + outputPath + " already exists");
+ System.exit(2);
+ }
+
+ TezConfiguration tezConf = new TezConfiguration(conf);
+
+ fs.getWorkingDirectory();
+ Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+ TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+
+ tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, MRHelpers.getMRAMJavaOpts(tezConf));
+
+ String jarPath = ClassUtil.findContainingJar(FilterLinesByWord.class);
+ if (jarPath == null) {
+ throw new TezUncheckedException("Could not find any jar containing"
+ + FilterLinesByWord.class.getName() + " in the classpath");
+ }
+
+ Path remoteJarPath = fs.makeQualified(new Path(stagingDir, "dag_job.jar"));
+ fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
+ FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath);
+
+ Map<String, LocalResource> commonLocalResources = new TreeMap<String, LocalResource>();
+ LocalResource dagJarLocalRsrc = LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+ remoteJarStatus.getLen(), remoteJarStatus.getModificationTime());
+ commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
+
+
+
+ AMConfiguration amConf = new AMConfiguration("default", null, commonLocalResources, tezConf, null);
+ TezSessionConfiguration sessionConf = new TezSessionConfiguration(amConf, tezConf);
+ TezSession tezSession = new TezSession("FilterLinesByWordSession", sessionConf);
+ tezSession.start(); // Why do I need to start the TezSession.
+
+ Configuration stage1Conf = new JobConf(conf);
+ stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
+ stage1Conf.setBoolean("mapred.mapper.new-api", false);
+ stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, Text.class.getName());
+ stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, TextLongPair.class.getName());
+ stage1Conf.set(FILTER_PARAM_NAME, filterWord);
+
+ InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf, stagingDir);
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
+
+
+
+ Configuration stage2Conf = new JobConf(conf);
+ stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, Text.class.getName());
+ stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, TextLongPair.class.getName());
+ stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
+ stage2Conf.setBoolean("mapred.mapper.new-api", false);
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf, stage1Conf);
+
+ MRHelpers.doJobClientMagic(stage1Conf);
+ MRHelpers.doJobClientMagic(stage2Conf);
+
+ // Setup stage1 Vertex
+ Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
+ FilterByWordInputProcessor.class.getName()).setUserPayload(MRHelpers
+ .createUserPayloadFromConf(stage1Conf)), inputSplitInfo.getNumTasks(),
+ MRHelpers.getMapResource(stage1Conf));
+ stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf)).setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+ Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
+ stage1LocalResources.putAll(commonLocalResources);
+ MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo, stage1LocalResources);
+ stage1Vertex.setTaskLocalResources(stage1LocalResources);
+ Map<String, String> stage1Env = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(stage1Conf, stage1Env, true);
+ stage1Vertex.setTaskEnvironment(stage1Env);
+
+ // Setup stage2 Vertex
+ Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
+ FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
+ .createUserPayloadFromConf(stage2Conf)), 1,
+ MRHelpers.getMapResource(stage2Conf));
+ stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf)).setTaskLocalResources(commonLocalResources);
+ Map<String, String> stage2Env = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(stage2Conf, stage2Env, false);
+ stage2Vertex.setTaskEnvironment(stage2Env);
+
+ DAG dag = new DAG("FilterLinesByWord");
+ Edge edge = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
+ DataMovementType.BROADCAST, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, new OutputDescriptor(
+ OnFileUnorderedKVOutput.class.getName()), new InputDescriptor(
+ ShuffledUnorderedKVInput.class.getName())));
+ dag.addVertex(stage1Vertex).addVertex(stage2Vertex).addEdge(edge);
+
+ LOG.info("Submitting DAG to Tez Session");
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ LOG.info("Submitted DAG to Tez Session");
+
+ DAGStatus dagStatus = null;
+ try {
+ while (true) {
+ dagStatus = dagClient.getDAGStatus();
+ if(dagStatus.getState() == DAGStatus.State.RUNNING ||
+ dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+ dagStatus.getState() == DAGStatus.State.FAILED ||
+ dagStatus.getState() == DAGStatus.State.KILLED ||
+ dagStatus.getState() == DAGStatus.State.ERROR) {
+ break;
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // continue;
+ }
+ }
+
+ while (dagStatus.getState() == DAGStatus.State.RUNNING) {
+ try {
+ ExampleDriver.printMRRDAGStatus(dagStatus);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // continue;
+ }
+ dagStatus = dagClient.getDAGStatus();
+ } catch (TezException e) {
+ LOG.fatal("Failed to get application progress. Exiting");
+ System.exit(-1);
+ }
+ }
+ } finally {
+ fs.delete(stagingDir, true);
+ tezSession.stop();
+ }
+
+ ExampleDriver.printMRRDAGStatus(dagStatus);
+ LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
+ System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+ }
+
+ public static class TextLongPair implements Writable {
+
+ private Text text;
+ private LongWritable longWritable;
+
+ public TextLongPair() {
+ }
+
+ public TextLongPair(Text text, LongWritable longWritable) {
+ this.text = text;
+ this.longWritable = longWritable;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ this.text.write(out);
+ this.longWritable.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.text = new Text();
+ this.longWritable = new LongWritable();
+ text.readFields(in);
+ longWritable.readFields(in);
+ }
+
+ @Override
+ public String toString() {
+ return text.toString() + "\t" + longWritable.get();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
new file mode 100644
index 0000000..e8e315a
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.processor;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.mapreduce.examples.FilterLinesByWord;
+import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KVReader.KVRecord;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+
+public class FilterByWordInputProcessor implements LogicalIOProcessor {
+
+ private static final Log LOG = LogFactory.getLog(FilterByWordInputProcessor.class);
+
+ private String filterWord;
+
+ public FilterByWordInputProcessor() {
+ }
+
+ @Override
+ public void initialize(TezProcessorContext processorContext) throws Exception {
+ Configuration conf = TezUtils.createConfFromUserPayload(processorContext.getUserPayload());
+ filterWord = conf.get(FilterLinesByWord.FILTER_PARAM_NAME);
+ if (filterWord == null) {
+ processorContext.fatalError(null, "No filter word specified");
+ }
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+ throw new UnsupportedOperationException("Not expecting any events to the broadcast processor");
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ LOG.info("Broadcast Processor closing. Nothing to do");
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs) throws Exception {
+
+ if (inputs.size() != 1) {
+ throw new IllegalStateException("TestBroadcast processor can only work with a single input");
+ }
+
+ if (outputs.size() != 1) {
+ throw new IllegalStateException("TestBroadcast processor can only work with a single output");
+ }
+
+ LogicalInput li = inputs.values().iterator().next();
+ if (! (li instanceof MRInput)) {
+ throw new IllegalStateException("TestBroadcast processor can only work with MRInput");
+ }
+
+ LogicalOutput lo = outputs.values().iterator().next();
+ if (! (lo instanceof OnFileUnorderedKVOutput)) {
+ throw new IllegalStateException("TestBroadcast processor can only work with OnFileUnorderedKVOutput");
+ }
+
+
+
+ MRInput mrInput = (MRInput) li;
+ OnFileUnorderedKVOutput kvOutput = (OnFileUnorderedKVOutput) lo;
+
+ Configuration updatedConf = mrInput.getConfigUpdates();
+ String fileName = updatedConf.get(MRJobConfig.MAP_INPUT_FILE);
+ LOG.info("Processing file: " + fileName);
+ Text srcFile = new Text();
+ if (fileName == null) {
+ srcFile.set("UNKNOWN_FILENAME_IN_PROCESSOR");
+ } else {
+ srcFile.set(fileName);
+ }
+
+ KVReader kvReader = mrInput.getReader();
+ KVWriter kvWriter = kvOutput.getWriter();
+
+ while (kvReader.next()) {
+ KVRecord kvRecord = kvReader.getCurrentKV();
+ Object key = kvRecord.getKey();
+ Object val = kvRecord.getValues().iterator().next();
+
+ Text valText = (Text) val;
+ String readVal = valText.toString();
+ if (readVal.contains(filterWord)) {
+ LongWritable lineNum = (LongWritable) key;
+ TextLongPair outVal = new TextLongPair(srcFile, lineNum);
+ kvWriter.write(valText, outVal);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
new file mode 100644
index 0000000..1cb5bad
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.processor;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KVReader.KVRecord;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+
+
+public class FilterByWordOutputProcessor implements LogicalIOProcessor {
+
+ private static final Log LOG = LogFactory.getLog(MapProcessor.class);
+ private TezProcessorContext processorContext;
+
+ public FilterByWordOutputProcessor() {
+ }
+
+ @Override
+ public void initialize(TezProcessorContext processorContext) throws Exception {
+ this.processorContext = processorContext;
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+ throw new UnsupportedOperationException("Not expecting any events to the broadcast output processor");
+ }
+
+ @Override
+ public void close() throws Exception {
+ LOG.info("Broadcast Output Processor closing. Nothing to do");
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs) throws Exception {
+
+ if (inputs.size() != 1) {
+ throw new IllegalStateException("TestBroadcast processor can only work with a single input");
+ }
+
+ if (outputs.size() != 1) {
+ throw new IllegalStateException("TestBroadcast processor can only work with a single output");
+ }
+
+ LogicalInput li = inputs.values().iterator().next();
+ if (! (li instanceof ShuffledUnorderedKVInput)) {
+ throw new IllegalStateException("TestBroadcast processor can only work with ShuffledUnorderedKVInput");
+ }
+
+ LogicalOutput lo = outputs.values().iterator().next();
+ if (! (lo instanceof MROutput)) {
+ throw new IllegalStateException("TestBroadcast processor can only work with MROutput");
+ }
+
+ ShuffledUnorderedKVInput kvInput = (ShuffledUnorderedKVInput) li;
+ MROutput mrOutput = (MROutput) lo;
+
+ KVReader kvReader = kvInput.getReader();
+ KVWriter kvWriter = mrOutput.getWriter();
+ while (kvReader.next()) {
+ KVRecord kvRecord = kvReader.getCurrentKV();
+ Object key = kvRecord.getKey();
+ Object value = kvRecord.getValues().iterator().next();
+
+ kvWriter.write(key, value);
+ }
+ if (processorContext.canCommit()) {
+ mrOutput.commit();
+ } else {
+ mrOutput.abort();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 5c8ec2b..6beec91 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -230,7 +230,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec
.getProcessorDescriptor().getUserPayload() : inputSpec
.getInputDescriptor().getUserPayload(), this,
- serviceConsumerMetadata);
+ serviceConsumerMetadata, System.getenv());
return inputContext;
}
@@ -242,7 +242,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec
.getProcessorDescriptor().getUserPayload() : outputSpec
.getOutputDescriptor().getUserPayload(), this,
- serviceConsumerMetadata);
+ serviceConsumerMetadata, System.getenv());
return outputContext;
}
@@ -250,7 +250,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
tezCounters, processorDescriptor.getUserPayload(), this,
- serviceConsumerMetadata);
+ serviceConsumerMetadata, System.getenv());
return processorContext;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 9169895..f9d1b1a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -44,9 +44,11 @@ public class TezInputContextImpl extends TezTaskContextImpl
TezUmbilical tezUmbilical, String taskVertexName,
String sourceVertexName, TezTaskAttemptID taskAttemptID,
TezCounters counters, byte[] userPayload,
- RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
+ RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata,
+ Map<String, String> auxServiceEnv) {
super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
- counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+ counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
+ auxServiceEnv);
this.userPayload = userPayload;
this.sourceVertexName = sourceVertexName;
this.sourceInfo = new EventMetaData(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index fd4c3a3..e97f1db 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -45,9 +45,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl
String destinationVertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters,
byte[] userPayload, RuntimeTask runtimeTask,
- Map<String, ByteBuffer> serviceConsumerMetadata) {
+ Map<String, ByteBuffer> serviceConsumerMetadata,
+ Map<String, String> auxServiceEnv) {
super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
- counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+ counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
+ auxServiceEnv);
this.userPayload = userPayload;
this.destinationVertexName = destinationVertexName;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index e73baf4..752376f 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -42,9 +42,11 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
TezUmbilical tezUmbilical, String vertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters,
byte[] userPayload, RuntimeTask runtimeTask,
- Map<String, ByteBuffer> serviceConsumerMetadata) {
+ Map<String, ByteBuffer> serviceConsumerMetadata,
+ Map<String, String> auxServiceEnv) {
super(conf, appAttemptNumber, vertexName, taskAttemptID,
- counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+ counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
+ auxServiceEnv);
this.userPayload = userPayload;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
taskVertexName, "", taskAttemptID);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index ee9e96d..b3dae4e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -45,12 +45,14 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
protected final TezUmbilical tezUmbilical;
private final Map<String, ByteBuffer> serviceConsumerMetadata;
private final int appAttemptNumber;
+ private final Map<String, String> auxServiceEnv;
@Private
public TezTaskContextImpl(Configuration conf, int appAttemptNumber,
String taskVertexName, TezTaskAttemptID taskAttemptID,
TezCounters counters, RuntimeTask runtimeTask,
- TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
+ TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata,
+ Map<String, String> auxServiceEnv) {
this.conf = conf;
this.taskVertexName = taskVertexName;
this.taskAttemptID = taskAttemptID;
@@ -63,6 +65,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
this.serviceConsumerMetadata = serviceConsumerMetadata;
// TODO NEWTEZ at some point dag attempt should not map to app attempt
this.appAttemptNumber = appAttemptNumber;
+ this.auxServiceEnv = auxServiceEnv;
}
@Override
@@ -123,7 +126,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
@Override
public ByteBuffer getServiceProviderMetaData(String serviceName) {
return AuxiliaryServiceHelper.getServiceDataFromEnv(
- serviceName, System.getenv());
+ serviceName, auxServiceEnv);
}
protected void signalFatalError(Throwable t, String message,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index dcdabe1..625ef29 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -34,6 +34,11 @@
<artifactId>tez-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
index cda52da..add7371 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
@@ -20,11 +20,12 @@ package org.apache.tez.runtime.library.broadcast.input;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
@@ -37,6 +38,8 @@ import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
public class BroadcastInputManager implements FetchedInputAllocator,
FetchedInputCallback {
+ private static final Log LOG = LogFactory.getLog(BroadcastInputManager.class);
+
private final Configuration conf;
private final TezTaskOutputFiles fileNameAllocator;
@@ -46,13 +49,13 @@ public class BroadcastInputManager implements FetchedInputAllocator,
private final long memoryLimit;
private final long maxSingleShuffleLimit;
- private long usedMemory = 0;
+ private volatile long usedMemory = 0;
- public BroadcastInputManager(TezInputContext inputContext, Configuration conf) {
+ public BroadcastInputManager(String uniqueIdentifier, Configuration conf) {
this.conf = conf;
this.fileNameAllocator = new TezTaskOutputFiles(conf,
- inputContext.getUniqueIdentifier());
+ uniqueIdentifier);
this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
// Setup configuration
@@ -80,6 +83,8 @@ public class BroadcastInputManager implements FetchedInputAllocator,
}
this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
+
+ LOG.info("BroadcastInputManager -> " + "MemoryLimit: " + this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
}
@Override
@@ -91,12 +96,13 @@ public class BroadcastInputManager implements FetchedInputAllocator,
localDirAllocator, fileNameAllocator);
} else {
this.usedMemory += size;
+ LOG.info("Used memory after allocating " + size + " : " + usedMemory);
return new MemoryFetchedInput(size, inputAttemptIdentifier, this);
}
}
@Override
- public void fetchComplete(FetchedInput fetchedInput) {
+ public synchronized void fetchComplete(FetchedInput fetchedInput) {
switch (fetchedInput.getType()) {
// Not tracking anything here.
case DISK:
@@ -109,12 +115,12 @@ public class BroadcastInputManager implements FetchedInputAllocator,
}
@Override
- public void fetchFailed(FetchedInput fetchedInput) {
+ public synchronized void fetchFailed(FetchedInput fetchedInput) {
cleanup(fetchedInput);
}
@Override
- public void freeResources(FetchedInput fetchedInput) {
+ public synchronized void freeResources(FetchedInput fetchedInput) {
cleanup(fetchedInput);
}
@@ -133,6 +139,7 @@ public class BroadcastInputManager implements FetchedInputAllocator,
private synchronized void unreserve(long size) {
this.usedMemory -= size;
+ LOG.info("Used memory after freeing " + size + " : " + usedMemory);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
index 16e9645..675d90d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -62,9 +62,10 @@ public class BroadcastKVReader<K, V> implements KVReader {
private FetchedInput currentFetchedInput;
private IFile.Reader currentReader;
+ private int numRecordsRead = 0;
public BroadcastKVReader(BroadcastShuffleManager shuffleManager,
- Configuration conf) {
+ Configuration conf) throws IOException {
this.shuffleManager = shuffleManager;
this.conf = conf;
@@ -77,15 +78,17 @@ public class BroadcastKVReader<K, V> implements KVReader {
}
this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
- this.valClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+ this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
this.keyIn = new DataInputBuffer();
this.valIn = new DataInputBuffer();
SerializationFactory serializationFactory = new SerializationFactory(conf);
- this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
+ this.valDeserializer.open(valIn);
this.valueIterator = new SimpleValueIterator();
this.valueIterable = new SimpleIterable(this.valueIterator);
@@ -104,15 +107,18 @@ public class BroadcastKVReader<K, V> implements KVReader {
@Override
public boolean next() throws IOException {
if (readNextFromCurrentReader()) {
+ numRecordsRead++;
return true;
} else {
boolean nextInputExists = moveToNextInput();
while (nextInputExists) {
if(readNextFromCurrentReader()) {
+ numRecordsRead++;
return true;
}
nextInputExists = moveToNextInput();
}
+ LOG.info("Num Records read: " + numRecordsRead);
return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index c64379a..38f6e6c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -34,6 +34,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovem
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.TextFormat;
public class BroadcastShuffleInputEventHandler {
@@ -72,6 +73,10 @@ public class BroadcastShuffleInputEventHandler {
} catch (InvalidProtocolBufferException e) {
throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
}
+ LOG.info("Processing data moveement event with srcIndex: "
+ + dme.getSourceIndex() + ", targetIndex: " + dme.getTargetIndex()
+ + ", attemptNum: " + dme.getVersion() + ", payload: "
+ + TextFormat.shortDebugString(shufflePayload));
if (shufflePayload.getOutputGenerated()) {
InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion(), shufflePayload.getPathComponent());
shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, 0);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 2a5c22f..fd47757 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -90,7 +90,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
private final BlockingQueue<FetchedInput> completedInputs;
private final Set<InputIdentifier> completedInputSet;
- private final Set<InputIdentifier> pendingInputs;
+// private final Set<InputIdentifier> pendingInputs;
private final ConcurrentMap<String, InputHost> knownSrcHosts;
private final Set<InputHost> pendingHosts;
private final Set<InputAttemptIdentifier> obsoletedInputs;
@@ -128,9 +128,8 @@ public class BroadcastShuffleManager implements FetcherCallback {
this.numInputs = numInputs;
this.inputEventHandler = new BroadcastShuffleInputEventHandler(inputContext, this);
- this.inputManager = new BroadcastInputManager(inputContext, conf);
+ this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf);
- pendingInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
@@ -172,12 +171,17 @@ public class BroadcastShuffleManager implements FetcherCallback {
codec = null;
decompressor = null;
}
+ LOG.info("BroadcastShuffleManager -> numInputs: " + numInputs
+ + " compressionCodec: " + (codec == null ? null : codec.getClass()
+ .getName()) + ", numFetchers: " + numFetchers);
}
public void run() {
RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
runShuffleFuture = new FutureTask<Void>(callable);
- new Thread(runShuffleFuture, "ShuffleRunner");
+ Thread runThread = new Thread(runShuffleFuture, "ShuffleRunner");
+ runThread.setDaemon(true);
+ runThread.start();
}
private class RunBroadcastShuffleCallable implements Callable<Void> {
@@ -186,8 +190,11 @@ public class BroadcastShuffleManager implements FetcherCallback {
public Void call() throws Exception {
while (numCompletedInputs.get() < numInputs) {
if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
- synchronized(lock) {
+ lock.lock();
+ try {
wakeLoop.await();
+ } finally {
+ lock.unlock();
}
if (shuffleError != null) {
// InputContext has already been informed of a fatal error.
@@ -195,8 +202,12 @@ public class BroadcastShuffleManager implements FetcherCallback {
break;
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NumCompletedInputs: " + numCompletedInputs);
+ }
if (numCompletedInputs.get() < numInputs) {
- synchronized (lock) {
+ lock.lock();
+ try {
int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers - numRunningFetchers.get());
int count = 0;
for (Iterator<InputHost> inputHostIter = pendingHosts.iterator() ; inputHostIter.hasNext() ; ) {
@@ -204,6 +215,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
inputHostIter.remove();
if (inputHost.getNumPendingInputs() > 0) {
Fetcher fetcher = constructFetcherForHost(inputHost);
+ LOG.info("Scheduling fetch for inputHost: " + inputHost);
numRunningFetchers.incrementAndGet();
ListenableFuture<FetchResult> future = fetcherExecutor
.submit(fetcher);
@@ -213,6 +225,8 @@ public class BroadcastShuffleManager implements FetcherCallback {
}
}
}
+ } finally {
+ lock.unlock();
}
}
}
@@ -252,7 +266,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
// TODO NEWTEZ Maybe limit the number of inputs being given to a single
// fetcher, especially in the case where #hosts < #fetchers
fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
- inputHost.clearAndGetPendingInputs());
+ pendingInputsForHost);
return fetcherBuilder.build();
}
@@ -269,9 +283,12 @@ public class BroadcastShuffleManager implements FetcherCallback {
}
}
host.addKnownInput(srcAttemptIdentifier);
- synchronized(lock) {
+ lock.lock();
+ try {
pendingHosts.add(host);
wakeLoop.signal();
+ } finally {
+ lock.unlock();
}
}
@@ -279,16 +296,24 @@ public class BroadcastShuffleManager implements FetcherCallback {
InputAttemptIdentifier srcAttemptIdentifier) {
InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
- if (pendingInputs.remove(inputIdentifier)) {
- completedInputSet.add(inputIdentifier);
- completedInputs.add(new NullFetchedInput(srcAttemptIdentifier));
- numCompletedInputs.incrementAndGet();
+
+ if (!completedInputSet.contains(inputIdentifier)) {
+ synchronized (completedInputSet) {
+ if (!completedInputSet.contains(inputIdentifier)) {
+ completedInputSet.add(inputIdentifier);
+ completedInputs.add(new NullFetchedInput(srcAttemptIdentifier));
+ numCompletedInputs.incrementAndGet();
+ }
+ }
}
// Awake the loop to check for termination.
- synchronized (lock) {
+ lock.lock();
+ try {
wakeLoop.signal();
- }
+ } finally {
+ lock.unlock();
+ }
}
public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
@@ -308,14 +333,16 @@ public class BroadcastShuffleManager implements FetcherCallback {
public void fetchSucceeded(String host,
InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes,
long copyDuration) throws IOException {
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
- }
-
+ InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+
+ LOG.info("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+
// Count irrespective of whether this is a copy of an already fetched input
- synchronized(lock) {
+ lock.lock();
+ try {
lastProgressTime = System.currentTimeMillis();
+ } finally {
+ lock.unlock();
}
boolean committed = false;
@@ -324,7 +351,6 @@ public class BroadcastShuffleManager implements FetcherCallback {
if (!completedInputSet.contains(inputIdentifier)) {
fetchedInput.commit();
committed = true;
- pendingInputs.remove(inputIdentifier);
completedInputSet.add(inputIdentifier);
completedInputs.add(fetchedInput);
numCompletedInputs.incrementAndGet();
@@ -334,9 +360,12 @@ public class BroadcastShuffleManager implements FetcherCallback {
if (!committed) {
fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
} else {
- synchronized(lock) {
+ lock.lock();
+ try {
// Signal the wakeLoop to check for termination.
wakeLoop.signal();
+ } finally {
+ lock.unlock();
}
}
// TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
@@ -347,6 +376,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
// TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
// For now, reporting immediately.
+ LOG.info("Fetch failed for src: " + srcAttemptIdentifier + ", connectFailed: " + connectFailed);
InputReadErrorEvent readError = new InputReadErrorEvent(
"Fetch failure while fetching from "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
@@ -459,8 +489,11 @@ public class BroadcastShuffleManager implements FetcherCallback {
private void doBookKeepingForFetcherComplete() {
numRunningFetchers.decrementAndGet();
- synchronized(lock) {
+ lock.lock();
+ try {
wakeLoop.signal();
+ } finally {
+ lock.unlock();
}
}
@@ -482,7 +515,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
public void onFailure(Throwable t) {
LOG.error("Fetcher failed with error: " + t);
shuffleError = t;
- inputContext.fatalError(t, "Fetched failed");
+ inputContext.fatalError(t, "Fetch failed");
doBookKeepingForFetcherComplete();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index 8b19ce0..7d33e63 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -20,6 +20,8 @@ package org.apache.tez.runtime.library.broadcast.output;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -40,6 +42,8 @@ import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
public class FileBasedKVWriter implements KVWriter {
+ private static final Log LOG = LogFactory.getLog(FileBasedKVWriter.class);
+
public static final int INDEX_RECORD_LENGTH = 24;
private final Configuration conf;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index a682a09..dcf8b6d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -64,7 +64,7 @@ class ShuffleScheduler {
private final Map<Integer, MutableInt> finishedMaps;
private final int numInputs;
private int remainingMaps;
- private Map<InputAttemptIdentifier, MapHost> mapLocations = new HashMap<InputAttemptIdentifier, MapHost>();
+ private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
//TODO NEWTEZ Clean this and other maps at some point
private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
private Set<MapHost> pendingHosts = new HashSet<MapHost>();
@@ -307,7 +307,7 @@ class ShuffleScheduler {
if (host == null) {
host = new MapHost(partitionId, hostName, hostUrl);
assert identifier.equals(host.getIdentifier());
- mapLocations.put(srcAttempt, host);
+ mapLocations.put(identifier, host);
}
host.addKnownMap(srcAttempt);
pathToIdentifierMap.put(srcAttempt.getPathComponent(), srcAttempt);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 4ce82d5..18583a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -222,7 +221,7 @@ public class IFile {
" for " + key);
}
- if(keyLength == previous.getLength()) {
+ if(rle && keyLength == previous.getLength()) {
sameKey = (BufferUtils.compare(previous, buffer) == 0);
}
@@ -238,7 +237,7 @@ public class IFile {
valueLength + " for " + value);
}
- if(sameKey) {
+ if(rle && sameKey) {
WritableUtils.writeVInt(out, RLE_MARKER); // Same key as previous
WritableUtils.writeVInt(out, valueLength); // value length
out.write(buffer.getData(), keyLength, buffer.getLength()); // only the value
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 42b2e00..40eff70 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -20,28 +20,36 @@ package org.apache.tez.runtime.library.input;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleManager;
import com.google.common.base.Preconditions;
public class ShuffledUnorderedKVInput implements LogicalInput {
+ private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVInput.class);
+
private Configuration conf;
private int numInputs = -1;
private BroadcastShuffleManager shuffleManager;
+ @SuppressWarnings("rawtypes")
+ private BroadcastKVReader kvReader;
public ShuffledUnorderedKVInput() {
}
+ @SuppressWarnings("rawtypes")
@Override
public List<Event> initialize(TezInputContext inputContext) throws Exception {
Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
@@ -49,13 +57,14 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
+ this.shuffleManager.run();
+ this.kvReader = new BroadcastKVReader(shuffleManager, conf);
return null;
}
@Override
- public Reader getReader() throws Exception {
- // TODO Auto-generated method stub
- return null;
+ public KVReader getReader() throws Exception {
+ return this.kvReader;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index dd18149..93c00d3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
@@ -33,6 +34,7 @@ import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -68,8 +70,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
- String host = System.getenv(ApplicationConstants.Environment.NM_HOST
- .toString());
+ String host = getHost();
ByteBuffer shuffleMetadata = outputContext
.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
int shufflePort = ShuffleUtils
@@ -94,5 +95,11 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
Preconditions.checkArgument(numOutputs == 1,
"Number of outputs can only be 1 for " + this.getClass().getName());
}
+
+ @VisibleForTesting
+ @Private
+ String getHost() {
+ return System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index ab7e5ba..1b5a8b2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -28,6 +28,7 @@ import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -93,7 +94,7 @@ public class Fetcher implements Callable<FetchResult> {
private int partition;
// Maps from the pathComponents (unique per srcTaskId) to the specific taskId
- private Map<String, InputAttemptIdentifier> pathToAttemptMap;
+ private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
private Set<InputAttemptIdentifier> remaining;
private URL url;
@@ -108,6 +109,7 @@ public class Fetcher implements Callable<FetchResult> {
this.shuffleSecret = shuffleSecret;
this.appId = appId;
this.conf = conf;
+ this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
index 66605dd..4759a8b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
@@ -87,4 +87,11 @@ public class InputHost {
return false;
return true;
}
+
+ @Override
+ public String toString() {
+ return "InputHost [host=" + host + ", port=" + port + ", inputs=" + inputs
+ + "]";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
index f56877f..e34301e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
@@ -73,7 +73,7 @@ public class MemoryFetchedInput extends FetchedInput {
Preconditions.checkState(
state == State.COMMITTED || state == State.ABORTED,
"FetchedInput can only be freed after it is committed or aborted");
- if (state == State.COMMITTED) {
+ if (state == State.COMMITTED) { // ABORTED would have already called cleanup
state = State.FREED;
this.byteStream = null;
notifyFreedResource();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
new file mode 100644
index 0000000..e6603c4
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.broadcast.input;
+
+import static org.junit.Assert.*;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.junit.Test;
+
+public class TestBroadcastInputManager {
+
+ private static final Log LOG = LogFactory.getLog(TestBroadcastInputManager.class);
+
+ @Test
+ public void testInMemAllocation() throws IOException {
+ String localDirs = "/tmp/" + this.getClass().getName();
+ Configuration conf = new Configuration();
+
+ long jvmMax = Runtime.getRuntime().maxMemory();
+ LOG.info("jvmMax: " + jvmMax);
+
+ float bufferPercent = 0.1f;
+ conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, bufferPercent);
+ conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 1.0f);
+ conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
+
+ long inMemThreshold = (long) (bufferPercent * jvmMax);
+ LOG.info("InMemThreshold: " + inMemThreshold);
+
+ BroadcastInputManager inputManager = new BroadcastInputManager(UUID.randomUUID().toString(), conf);
+
+ long requestSize = (long) (0.4f * inMemThreshold);
+ LOG.info("RequestSize: " + requestSize);
+
+ FetchedInput fi1 = inputManager.allocate(requestSize, new InputAttemptIdentifier(1, 1));
+ assertEquals(FetchedInput.Type.MEMORY, fi1.getType());
+
+
+ FetchedInput fi2 = inputManager.allocate(requestSize, new InputAttemptIdentifier(2, 1));
+ assertEquals(FetchedInput.Type.MEMORY, fi2.getType());
+
+
+ // Over limit by this point. Next reserve should give back a DISK allocation
+ FetchedInput fi3 = inputManager.allocate(requestSize, new InputAttemptIdentifier(3, 1));
+ assertEquals(FetchedInput.Type.DISK, fi3.getType());
+
+
+ // Freed one memory allocation. Next should be mem again.
+ fi1.abort();
+ fi1.free();
+ FetchedInput fi4 = inputManager.allocate(requestSize, new InputAttemptIdentifier(4, 1));
+ assertEquals(FetchedInput.Type.MEMORY, fi4.getType());
+
+ // Freed one disk allocation. Next sould be disk again (no mem freed)
+ fi3.abort();
+ fi3.free();
+ FetchedInput fi5 = inputManager.allocate(requestSize, new InputAttemptIdentifier(4, 1));
+ assertEquals(FetchedInput.Type.DISK, fi5.getType());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
new file mode 100644
index 0000000..019fd0e
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.testutils.KVDataGen;
+import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestIFile {
+
+ private static final Log LOG = LogFactory.getLog(TestIFile.class);
+
+ private static Configuration defaultConf = new Configuration();
+ private static FileSystem localFs = null;
+ private static Path workDir = null;
+
+ static {
+ defaultConf.set("fs.defaultFS", "file:///");
+ try {
+ localFs = FileSystem.getLocal(defaultConf);
+ workDir = new Path(
+ new Path(System.getProperty("test.build.data", "/tmp")), TestIFile.class.getName())
+ .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+ LOG.info("Using workDir: " + workDir);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Before
+ @After
+ public void cleanup() throws Exception {
+ localFs.delete(workDir, true);
+ }
+
+ @Test
+ public void testRepeatedKeysInMemReaderNoRLE() throws IOException {
+ String outputFileName = "ifile.out";
+ Path outputPath = new Path(workDir, outputFileName);
+ List<KVPair> data = KVDataGen.generateTestData(true);
+ Writer writer = writeTestFile(outputPath, false, data);
+
+ FSDataInputStream inStream = localFs.open(outputPath);
+ byte[] bytes = new byte[(int)writer.getRawLength()];
+
+ readDataToMem(inStream, bytes);
+ inStream.close();
+
+ InMemoryReader inMemReader = new InMemoryReader(null, new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
+ readAndVerify(inMemReader, data);
+ }
+
+ @Test
+ public void testRepeatedKeysFileReaderNoRLE() throws IOException {
+ String outputFileName = "ifile.out";
+ Path outputPath = new Path(workDir, outputFileName);
+ List<KVPair> data = KVDataGen.generateTestData(true);
+ writeTestFile(outputPath, false, data);
+
+ IFile.Reader reader = new IFile.Reader(defaultConf, localFs, outputPath, null, null);
+
+ readAndVerify(reader, data);
+ reader.close();
+ }
+
+ @Ignore // TEZ-500
+ @Test
+ public void testRepeatedKeysInMemReaderRLE() throws IOException {
+ String outputFileName = "ifile.out";
+ Path outputPath = new Path(workDir, outputFileName);
+ List<KVPair> data = KVDataGen.generateTestData(true);
+ Writer writer = writeTestFile(outputPath, true, data);
+
+ FSDataInputStream inStream = localFs.open(outputPath);
+ byte[] bytes = new byte[(int)writer.getRawLength()];
+
+ readDataToMem(inStream, bytes);
+ inStream.close();
+
+
+ InMemoryReader inMemReader = new InMemoryReader(null, new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
+ readAndVerify(inMemReader, data);
+ }
+
+ @Ignore // TEZ-500
+ @Test
+ public void testRepeatedKeysFileReaderRLE() throws IOException {
+ String outputFileName = "ifile.out";
+ Path outputPath = new Path(workDir, outputFileName);
+ List<KVPair> data = KVDataGen.generateTestData(true);
+ writeTestFile(outputPath, true, data);
+
+ IFile.Reader reader = new IFile.Reader(defaultConf, localFs, outputPath, null, null);
+
+ readAndVerify(reader, data);
+ reader.close();
+ }
+
+ private void readDataToMem(FSDataInputStream inStream, byte[] bytes) throws IOException {
+ int toRead = bytes.length;
+ int offset = 0;
+ while (toRead > 0) {
+ int ret = inStream.read(bytes, offset, toRead);
+ if (ret < 0) {
+ throw new IOException("Premature EOF from inputStream");
+ }
+ toRead -= ret;
+ offset += ret;
+ }
+ LOG.info("Read: " + bytes.length + " bytes");
+ }
+
+ private void readAndVerify(Reader reader, List<KVPair> data)
+ throws IOException {
+ Text readKey = new Text();
+ IntWritable readValue = new IntWritable();
+ DataInputBuffer keyIn = new DataInputBuffer();
+ DataInputBuffer valIn = new DataInputBuffer();
+ Deserializer<Text> keyDeserializer;
+ Deserializer<IntWritable> valDeserializer;
+ SerializationFactory serializationFactory = new SerializationFactory(
+ defaultConf);
+ keyDeserializer = serializationFactory.getDeserializer(Text.class);
+ valDeserializer = serializationFactory.getDeserializer(IntWritable.class);
+ keyDeserializer.open(keyIn);
+ valDeserializer.open(valIn);
+
+ int numRecordsRead = 0;
+
+ while (reader.nextRawKey(keyIn)) {
+ reader.nextRawValue(valIn);
+ readKey = keyDeserializer.deserialize(readKey);
+ readValue = valDeserializer.deserialize(readValue);
+
+ KVPair expected = data.get(numRecordsRead);
+ assertEquals("Key does not match: Expected: " + expected.getKey()
+ + ", Read: " + readKey, expected.getKey(), readKey);
+ assertEquals("Value does not match: Expected: " + expected.getvalue()
+ + ", Read: " + readValue, expected.getvalue(), readValue);
+
+ numRecordsRead++;
+ }
+ assertEquals("Expected: " + data.size() + " records, but found: "
+ + numRecordsRead, data.size(), numRecordsRead);
+ LOG.info("Found: " + numRecordsRead + " records");
+ }
+
+ private Writer writeTestFile(Path outputPath, boolean useRle, List<KVPair> data)
+ throws IOException {
+
+ IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
+ Text.class, IntWritable.class, null, null);
+ writer.setRLE(useRle);
+
+ for (KVPair kvp : data) {
+ writer.append(kvp.getKey(), kvp.getvalue());
+ }
+
+ writer.close();
+
+ LOG.info("Uncompressed: " + writer.getRawLength());
+ LOG.info("CompressedSize: " + writer.getCompressedLength());
+
+ return writer;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
new file mode 100644
index 0000000..ff9afbd
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.runtime.library.testutils.KVDataGen;
+import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestOnFileUnorderedKVOutput {
+
+ private static final Log LOG = LogFactory.getLog(TestOnFileUnorderedKVOutput.class);
+
+ private static Configuration defaultConf = new Configuration();
+ private static FileSystem localFs = null;
+ private static Path workDir = null;
+
+ static {
+ defaultConf.set("fs.defaultFS", "file:///");
+ try {
+ localFs = FileSystem.getLocal(defaultConf);
+ workDir = new Path(
+ new Path(System.getProperty("test.build.data", "/tmp")), TestOnFileUnorderedKVOutput.class.getName())
+ .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+ LOG.info("Using workDir: " + workDir);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Before
+ @After
+ public void cleanup() throws Exception {
+ localFs.delete(workDir, true);
+ }
+
+ @Test
+ public void testGeneratedDataMovementEvent() throws Exception {
+
+ OnFileUnorderedKVOutput kvOutput = new OnFileUnorderedKVOutputForTest();
+
+ Configuration conf = new Configuration();
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, Text.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, IntWritable.class.getName());
+
+ conf.setStrings(TezJobConfig.LOCAL_DIRS, workDir.toString());
+
+ int appAttemptNumber = 1;
+ TezUmbilical tezUmbilical = null; // ZZZ TestUmbilical from mapreduce
+ String taskVertexName = "currentVertex";
+ String destinationVertexName = "destinationVertex";
+ TezDAGID dagID = new TezDAGID("2000", 1, 1);
+ TezVertexID vertexID = new TezVertexID(dagID, 1);
+ TezTaskID taskID = new TezTaskID(vertexID, 1);
+ TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 1);
+ TezCounters counters = new TezCounters();
+ byte[] userPayload = TezUtils.createUserPayloadFromConf(conf);
+ RuntimeTask runtimeTask = null;
+
+ int shufflePort = 2112;
+ Map<String, String> auxEnv = new HashMap<String, String>();
+ ByteBuffer bb = ByteBuffer.allocate(4);
+ bb.putInt(shufflePort);
+ bb.position(0);
+ AuxiliaryServiceHelper.setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, bb, auxEnv);
+
+
+ TezOutputContext outputContext = new TezOutputContextImpl(conf,
+ appAttemptNumber, tezUmbilical, taskVertexName, destinationVertexName,
+ taskAttemptID, counters, userPayload, runtimeTask,
+ null, auxEnv);
+
+ List<Event> events = null;
+
+ events = kvOutput.initialize(outputContext);
+ assertTrue(events != null && events.size() == 0);
+
+ KVWriter kvWriter = kvOutput.getWriter();
+ List<KVPair> data = KVDataGen.generateTestData(true);
+ for (KVPair kvp : data) {
+ kvWriter.write(kvp.getKey(), kvp.getvalue());
+ }
+
+ events = kvOutput.close();
+ assertTrue(events != null && events.size() == 1);
+ DataMovementEvent dmEvent = (DataMovementEvent)events.get(0);
+
+ assertEquals("Invalid source index", 0, dmEvent.getSourceIndex());
+
+ DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto
+ .parseFrom(dmEvent.getUserPayload());
+
+ assertTrue(shufflePayload.getOutputGenerated());
+ assertEquals(outputContext.getUniqueIdentifier(), shufflePayload.getPathComponent());
+ assertEquals(shufflePort, shufflePayload.getPort());
+ assertEquals("host", shufflePayload.getHost());
+ }
+
+ private static class OnFileUnorderedKVOutputForTest extends OnFileUnorderedKVOutput {
+ @Override
+ String getHost() {
+ return "host";
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
new file mode 100644
index 0000000..90bb0b3
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.testutils;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+public class KVDataGen {
+
+ public static List<KVPair> generateTestData(boolean repeat) {
+ List<KVPair> data = new LinkedList<KVPair>();
+ int repeatCount = 0;
+ for (int i = 0; i < 5; i++) {
+ Text key = new Text("key" + i);
+ IntWritable value = new IntWritable(i + repeatCount);
+ KVPair kvp = new KVPair(key, value);
+ data.add(kvp);
+ if (repeat && i == 2) { // Repeat this key
+ repeatCount++;
+ value.set(i + repeatCount);
+ kvp = new KVPair(key, value);
+ data.add(kvp);
+ }
+ }
+ return data;
+ }
+
+ public static class KVPair {
+ private Text key;
+ private IntWritable value;
+
+ public KVPair(Text key, IntWritable value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public Text getKey() {
+ return this.key;
+ }
+
+ public IntWritable getvalue() {
+ return this.value;
+ }
+ }
+}
\ No newline at end of file