You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/22 07:40:58 UTC
git commit: TEZ-1499. Add SortMergeJoinExample to tez-examples (Jeff
Zhang via bikas)
Repository: tez
Updated Branches:
refs/heads/master 55ae1e57c -> 4023898c1
TEZ-1499. Add SortMergeJoinExample to tez-examples (Jeff Zhang via bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4023898c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4023898c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4023898c
Branch: refs/heads/master
Commit: 4023898c109b8e50e71fa08eacdf294b66d162d1
Parents: 55ae1e5
Author: Bikas Saha <bi...@apache.org>
Authored: Sun Sep 21 22:40:47 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Sep 21 22:40:47 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/tez/examples/ExampleDriver.java | 6 +-
.../apache/tez/examples/HashJoinExample.java | 403 +++++++++++++++++++
.../org/apache/tez/examples/JoinDataGen.java | 1 -
.../org/apache/tez/examples/JoinExample.java | 364 -----------------
.../org/apache/tez/examples/JoinValidate.java | 4 +-
.../tez/examples/SortMergeJoinExample.java | 374 +++++++++++++++++
.../java/org/apache/tez/test/TestTezJobs.java | 144 ++++++-
8 files changed, 913 insertions(+), 386 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6fe7ff0..407acc9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.0: Unreleased
INCOMPATIBLE CHANGES
TEZ-1488. Rename HashComparator to ProxyComparator and implement in TezBytesComparator
TEZ-1578. Remove TeraSort from Tez codebase.
+ TEZ-1499. Add SortMergeJoinExample to tez-examples
ALL CHANGES:
TEZ-1544. Link to release artifacts for 0.5.0 does not point to a specific link for 0.5.0.
@@ -14,6 +15,7 @@ ALL CHANGES:
TEZ-853. Support counters recovery.
TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized.
TEZ-1575. MRRSleepJob does not pick MR settings for container size and java opts.
+ TEZ-1488. Rename HashComparator to ProxyComparator and implement in TezBytesComparator
TEZ-1578. Remove TeraSort from Tez codebase.
TEZ-1569. Add tests for preemption
TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
@@ -21,6 +23,7 @@ ALL CHANGES:
TEZ-1581. GroupByOrderByMRRTest no longer functional.
TEZ-1157. Optimize broadcast shuffle to download data only once per host.
TEZ-1607. support mr envs in mrrsleep and testorderedwordcount
+ TEZ-1499. Add SortMergeJoinExample to tez-examples
Release 0.5.1: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java b/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java
index 6f689e2..5394cc1 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java
@@ -50,8 +50,10 @@ public class ExampleDriver {
"Word Count with words sorted on frequency");
pgd.addClass("simplesessionexample", SimpleSessionExample.class,
"Example to run multiple dags in a session");
- pgd.addClass("joinexample", JoinExample.class,
- "Identify all occurences of lines in file1 which also occur in file2");
+ pgd.addClass("hashjoin", HashJoinExample.class,
+ "Identify all occurences of lines in file1 which also occur in file2 using hash join");
+ pgd.addClass("sortmergejoin", SortMergeJoinExample.class,
+ "Identify all occurences of lines in file1 which also occur in file2 using sort merge join");
pgd.addClass("joindatagen", JoinDataGen.class,
"Generate data to run the joinexample");
pgd.addClass("joinvalidate", JoinValidate.class,
http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
new file mode 100644
index 0000000..76e53e1
--- /dev/null
+++ b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.examples;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Simple example of joining 2 data sets using <a
+ * href="http://en.wikipedia.org/wiki/Hash_join">Hash Join</a>.<br>
+ * The example shows a vertex with multiple inputs that represent the two data
+ * sets that need to be joined. This HashJoinExample assume that keys in the
+ * second dataset (hashSide) is unique.<br>
+ * The join can be performed using a broadcast (or replicate-fragment) join in
+ * which the small side of the join is broadcast in total to fragments of the
+ * larger side. Each fragment of the larger side can perform the join operation
+ * independently using the full data of the smaller side. This shows the usage
+ * of the broadcast edge property in Tez. <br>
+ * The join can be performed using the regular repartition join where both sides
+ * are partitioned according to the same scheme into the same number of
+ * fragments. Then the keys in the same fragment are joined with each other.
+ * This is the default join strategy.
+ */
+public class HashJoinExample extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(HashJoinExample.class);
+
+ private static final String broadcastOption = "doBroadcast";
+ private static final String streamingSide = "streamingSide";
+ private static final String hashSide = "hashSide";
+ private static final String inputFile = "inputFile";
+ private static final String joiner = "joiner";
+ private static final String joinOutput = "joinOutput";
+
+ public static void main(String[] args) throws Exception {
+ HashJoinExample job = new HashJoinExample();
+ int status = ToolRunner.run(new Configuration(), job, args);
+ System.exit(status);
+ }
+
+ private static void printUsage() {
+ System.err.println("Usage: "
+ + "hashjoin <file1> <file2> <numPartitions> <outPath> ["
+ + broadcastOption + "(default false)]");
+ ToolRunner.printGenericCommandUsage(System.err);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Configuration conf = getConf();
+ String[] otherArgs =
+ new GenericOptionsParser(conf, args).getRemainingArgs();
+ int result = validateArgs(otherArgs);
+ if (result != 0) {
+ return result;
+ }
+ return execute(otherArgs);
+ }
+
+ public int run(Configuration conf, String[] args, TezClient tezClient)
+ throws Exception {
+ setConf(conf);
+ String[] otherArgs =
+ new GenericOptionsParser(conf, args).getRemainingArgs();
+ int result = validateArgs(otherArgs);
+ if (result != 0) {
+ return result;
+ }
+ return execute(otherArgs, tezClient);
+ }
+
+ private int validateArgs(String[] otherArgs) {
+ if (!(otherArgs.length == 4 || otherArgs.length == 5)) {
+ printUsage();
+ return 2;
+ }
+ return 0;
+ }
+
+ private int execute(String[] args) throws TezException, IOException,
+ InterruptedException {
+ TezConfiguration tezConf = new TezConfiguration(getConf());
+ TezClient tezClient = null;
+ try {
+ tezClient = createTezClient(tezConf);
+ return execute(args, tezConf, tezClient);
+ } finally {
+ if (tezClient != null) {
+ tezClient.stop();
+ }
+ }
+ }
+
+ private int execute(String[] args, TezClient tezClient) throws IOException,
+ TezException, InterruptedException {
+ TezConfiguration tezConf = new TezConfiguration(getConf());
+ return execute(args, tezConf, tezClient);
+ }
+
+ private TezClient createTezClient(TezConfiguration tezConf)
+ throws TezException, IOException {
+ TezClient tezClient = TezClient.create("HashJoinExample", tezConf);
+ tezClient.start();
+ return tezClient;
+ }
+
+ private int execute(String[] args, TezConfiguration tezConf,
+ TezClient tezClient) throws IOException, TezException,
+ InterruptedException {
+ boolean doBroadcast =
+ args.length == 5 && args[4].equals(broadcastOption) ? true : false;
+ LOG.info("Running HashJoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
+
+ UserGroupInformation.setConfiguration(tezConf);
+
+ String streamInputDir = args[0];
+ String hashInputDir = args[1];
+ int numPartitions = Integer.parseInt(args[2]);
+ String outputDir = args[3];
+
+ Path streamInputPath = new Path(streamInputDir);
+ Path hashInputPath = new Path(hashInputDir);
+ Path outputPath = new Path(outputDir);
+
+ // Verify output path existence
+ FileSystem fs = FileSystem.get(tezConf);
+ if (fs.exists(outputPath)) {
+ System.err.println("Output directory: " + outputDir + " already exists");
+ return 3;
+ }
+ if (numPartitions <= 0) {
+ System.err.println("NumPartitions must be > 0");
+ return 4;
+ }
+
+ DAG dag =
+ createDag(tezConf, streamInputPath, hashInputPath, outputPath,
+ numPartitions, doBroadcast);
+
+ tezClient.waitTillReady();
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
+ if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+ LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+ return -1;
+ }
+ return 0;
+
+ }
+
+ private DAG createDag(TezConfiguration tezConf, Path streamPath,
+ Path hashPath, Path outPath, int numPartitions, boolean doBroadcast)
+ throws IOException {
+ DAG dag = DAG.create("HashJoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
+
+ /**
+ * This vertex represents the side of the join that will be accumulated in a
+ * hash table in order to join it against the other side. It reads text data
+ * using the TextInputFormat. ForwardingProcessor simply forwards the data
+ * downstream as is.
+ */
+ Vertex hashFileVertex =
+ Vertex.create(hashSide,
+ ProcessorDescriptor.create(ForwardingProcessor.class.getName()))
+ .addDataSource(
+ inputFile,
+ MRInput
+ .createConfigBuilder(new Configuration(tezConf),
+ TextInputFormat.class, hashPath.toUri().toString())
+ .groupSplits(false).build());
+
+ /**
+ * This vertex represents that side of the data that will be streamed and
+ * joined against the other side that has been accumulated into a hash
+ * table. It reads text data using the TextInputFormat. ForwardingProcessor
+ * simply forwards the data downstream as is.
+ */
+ Vertex streamFileVertex =
+ Vertex.create(streamingSide,
+ ProcessorDescriptor.create(ForwardingProcessor.class.getName()))
+ .addDataSource(
+ inputFile,
+ MRInput
+ .createConfigBuilder(new Configuration(tezConf),
+ TextInputFormat.class, streamPath.toUri().toString())
+ .groupSplits(false).build());
+
+ /**
+ * This vertex represents the join operation. It writes the join output as
+ * text using the TextOutputFormat. The JoinProcessor is going to perform
+ * the join of the streaming side and the hash side. It is load balanced
+ * across numPartitions
+ */
+ Vertex joinVertex =
+ Vertex.create(joiner,
+ ProcessorDescriptor.create(HashJoinProcessor.class.getName()),
+ numPartitions).addDataSink(
+ joinOutput,
+ MROutput.createConfigBuilder(new Configuration(tezConf),
+ TextOutputFormat.class, outPath.toUri().toString()).build());
+
+ /**
+ * The streamed side will be partitioned into fragments with the same keys
+ * going to the same fragments using hash partitioning. The data to be
+ * joined is the key itself and so the value is null. The number of
+ * fragments is initially inferred from the number of tasks running in the
+ * join vertex because each task will be handling one fragment.
+ */
+ UnorderedPartitionedKVEdgeConfig streamConf =
+ UnorderedPartitionedKVEdgeConfig
+ .newBuilder(Text.class.getName(), NullWritable.class.getName(),
+ HashPartitioner.class.getName()).setFromConfiguration(tezConf)
+ .build();
+
+ /**
+ * Connect the join vertex with the stream side
+ */
+ Edge e1 =
+ Edge.create(streamFileVertex, joinVertex,
+ streamConf.createDefaultEdgeProperty());
+
+ EdgeProperty hashSideEdgeProperty = null;
+ if (doBroadcast) {
+ /**
+ * This option can be used when the hash side is small. We can broadcast
+ * the entire data to all fragments of the stream side. This avoids
+ * re-partitioning the fragments of the stream side to match the
+ * partitioning scheme of the hash side and avoids costly network data
+ * transfer. However, in this example the stream side is being partitioned
+ * in both cases for brevity of code. The join task can perform the join
+ * of its fragment of keys with all the keys of the hash side. Using an
+ * unpartitioned edge to transfer the complete output of the hash side to
+ * be broadcasted to all fragments of the streamed side. Again, since the
+ * data is the key, the value is null.
+ */
+ UnorderedKVEdgeConfig broadcastConf =
+ UnorderedKVEdgeConfig
+ .newBuilder(Text.class.getName(), NullWritable.class.getName())
+ .setFromConfiguration(tezConf).build();
+ hashSideEdgeProperty = broadcastConf.createDefaultBroadcastEdgeProperty();
+ } else {
+ /**
+ * The hash side is also being partitioned into fragments with the same
+ * key going to the same fragment using hash partitioning. This way all
+ * keys with the same hash value will go to the same fragment from both
+ * sides. Thus the join task handling that fragment can join both data set
+ * fragments.
+ */
+ hashSideEdgeProperty = streamConf.createDefaultEdgeProperty();
+ }
+
+ /**
+ * Connect the join vertex to the hash side. The join vertex is connected
+ * with 2 upstream vertices that provide it with inputs
+ */
+ Edge e2 = Edge.create(hashFileVertex, joinVertex, hashSideEdgeProperty);
+
+ /**
+ * Connect everything up by adding them to the DAG
+ */
+ dag.addVertex(streamFileVertex).addVertex(hashFileVertex)
+ .addVertex(joinVertex).addEdge(e1).addEdge(e2);
+ return dag;
+ }
+
+ /**
+ * Reads key-values from the source and forwards the value as the key for the
+ * output
+ */
+ public static class ForwardingProcessor extends SimpleProcessor {
+ public ForwardingProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ Preconditions.checkState(getInputs().size() == 1);
+ Preconditions.checkState(getOutputs().size() == 1);
+ // not looking up inputs and outputs by name because there is just one
+ // instance and this processor is used in many different DAGs with
+ // different names for inputs and outputs
+ LogicalInput input = getInputs().values().iterator().next();
+ Reader rawReader = input.getReader();
+ Preconditions.checkState(rawReader instanceof KeyValueReader);
+ LogicalOutput output = getOutputs().values().iterator().next();
+
+ KeyValueReader reader = (KeyValueReader) rawReader;
+ KeyValueWriter writer = (KeyValueWriter) output.getWriter();
+
+ while (reader.next()) {
+ Object val = reader.getCurrentValue();
+ // The data value itself is the join key. Simply write it out as the
+ // key.
+ // The output value is null.
+ writer.write(val, NullWritable.get());
+ }
+ }
+ }
+
+ /**
+ * Join 2 inputs using Hash Join algorithm. Check the algorithm here <a
+ * href="http://en.wikipedia.org/wiki/Hash_join">Hash Join</a> <br>
+ * It would output all the occurrences keys in the streamFile which also exist
+ * in the hashFile. This require the keys in hashFile should be unique
+ * <br>Disclaimer: The join code here is written as a tutorial for the APIs and
+ * not for performance.
+ */
+ public static class HashJoinProcessor extends SimpleMRProcessor {
+
+ public HashJoinProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ Preconditions.checkState(getInputs().size() == 2);
+ Preconditions.checkState(getOutputs().size() == 1);
+ // Get the input data for the 2 sides of the join from the 2 inputs
+ LogicalInput streamInput = getInputs().get(streamingSide);
+ LogicalInput hashInput = getInputs().get(hashSide);
+ Reader rawStreamReader = streamInput.getReader();
+ Reader rawHashReader = hashInput.getReader();
+ Preconditions.checkState(rawStreamReader instanceof KeyValueReader);
+ Preconditions.checkState(rawHashReader instanceof KeyValueReader);
+ LogicalOutput lo = getOutputs().get(joinOutput);
+ Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter);
+ KeyValueWriter writer = (KeyValueWriter) lo.getWriter();
+
+ // create a hash table for the hash side
+ KeyValueReader hashKvReader = (KeyValueReader) rawHashReader;
+ Set<Text> keySet = new HashSet<Text>();
+ while (hashKvReader.next()) {
+ keySet.add(new Text((Text) hashKvReader.getCurrentKey()));
+ }
+
+ // read the stream side and join it using the hash table
+ KeyValueReader streamKvReader = (KeyValueReader) rawStreamReader;
+ while (streamKvReader.next()) {
+ Text key = (Text) streamKvReader.getCurrentKey();
+ if (keySet.contains(key)) {
+ writer.write(key, NullWritable.get());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
index 8231b6f..ff73247 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
@@ -73,7 +73,6 @@ public class JoinDataGen extends Configured implements Tool {
System.err
.println("Usage: "
+ "joindatagen <outPath1> <path1Size> <outPath2> <path2Size> <expectedResultPath> <numTasks>");
- ;
ToolRunner.printGenericCommandUsage(System.err);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/JoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinExample.java
deleted file mode 100644
index 3611fd6..0000000
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinExample.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.examples;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.input.MRInput;
-import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
-import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.runtime.library.processor.SimpleProcessor;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Simple example of joining 2 data sets.
- * <br>The example shows a vertex with multiple inputs that represent the two
- * data sets that need to be joined.
- * <br>The join can be performed using a broadcast (or replicate-fragment) join in
- * which the small side of the join is broadcast in total to fragments of the
- * larger side. Each fragment of the larger side can perform the join operation
- * independently using the full data of the smaller side. This shows the usage
- * of the broadcast edge property in Tez.
- * <br>The join can be performed using the regular repartition join where both
- * sides are partitioned according to the same scheme into the same number of
- * fragments. Then the keys in the same fragment are joined with each other. This
- * is the default join strategy.
- *
- */
-public class JoinExample extends Configured implements Tool {
-
- private static final Log LOG = LogFactory.getLog(JoinExample.class);
-
- private static final String broadcastOption = "doBroadcast";
- private static final String streamingSide = "streamingSide";
- private static final String hashSide = "hashSide";
- private static final String inputFile = "inputFile";
- private static final String joiner = "joiner";
- private static final String joinOutput = "joinOutput";
-
-
- public static void main(String[] args) throws Exception {
- JoinExample job = new JoinExample();
- int status = ToolRunner.run(new Configuration(), job, args);
- System.exit(status);
- }
-
- private static void printUsage() {
- System.err.println("Usage: " + "joinexample <file1> <file2> <numPartitions> <outPath> ["
- + broadcastOption + "(default false)]");
- ToolRunner.printGenericCommandUsage(System.err);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf = getConf();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- int result = validateArgs(otherArgs);
- if (result != 0) {
- return result;
- }
- return execute(otherArgs);
- }
-
- public int run(Configuration conf, String[] args, TezClient tezClient) throws Exception {
- setConf(conf);
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- int result = validateArgs(otherArgs);
- if (result != 0) {
- return result;
- }
- return execute(otherArgs, tezClient);
- }
-
- private int validateArgs(String[] otherArgs) {
- if (!(otherArgs.length == 4 || otherArgs.length == 5)) {
- printUsage();
- return 2;
- }
- return 0;
- }
-
- private int execute(String[] args) throws TezException, IOException, InterruptedException {
- TezConfiguration tezConf = new TezConfiguration(getConf());
- TezClient tezClient = null;
- try {
- tezClient = createTezClient(tezConf);
- return execute(args, tezConf, tezClient);
- } finally {
- if (tezClient != null) {
- tezClient.stop();
- }
- }
- }
-
- private int execute(String[] args, TezClient tezClient) throws IOException, TezException,
- InterruptedException {
- TezConfiguration tezConf = new TezConfiguration(getConf());
- return execute(args, tezConf, tezClient);
- }
-
- private TezClient createTezClient(TezConfiguration tezConf) throws TezException, IOException {
- TezClient tezClient = TezClient.create("JoinExample", tezConf);
- tezClient.start();
- return tezClient;
- }
-
- private int execute(String[] args, TezConfiguration tezConf, TezClient tezClient)
- throws IOException, TezException, InterruptedException {
- boolean doBroadcast = args.length == 5 && args[4].equals(broadcastOption) ? true : false;
- LOG.info("Running JoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
-
- UserGroupInformation.setConfiguration(tezConf);
-
- String streamInputDir = args[0];
- String hashInputDir = args[1];
- int numPartitions = Integer.parseInt(args[2]);
- String outputDir = args[3];
-
- Path streamInputPath = new Path(streamInputDir);
- Path hashInputPath = new Path(hashInputDir);
- Path outputPath = new Path(outputDir);
-
- // Verify output path existence
- FileSystem fs = FileSystem.get(tezConf);
- if (fs.exists(outputPath)) {
- System.err.println("Output directory: " + outputDir + " already exists");
- return 3;
- }
- if (numPartitions <= 0) {
- System.err.println("NumPartitions must be > 0");
- return 4;
- }
-
- DAG dag = createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions, doBroadcast);
-
- tezClient.waitTillReady();
- DAGClient dagClient = tezClient.submitDAG(dag);
- DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
- if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
- LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
- return -1;
- }
- return 0;
-
- }
-
- private DAG createDag(TezConfiguration tezConf, Path streamPath, Path hashPath, Path outPath,
- int numPartitions, boolean doBroadcast) throws IOException {
- DAG dag = DAG.create("JoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
-
- /**
- * This vertex represents the side of the join that will be accumulated in a hash
- * table in order to join it against the other side. It reads text data using the
- * TextInputFormat. ForwardingProcessor simply forwards the data downstream as is.
- */
- Vertex hashFileVertex = Vertex.create(hashSide, ProcessorDescriptor.create(
- ForwardingProcessor.class.getName())).addDataSource(
- inputFile,
- MRInput
- .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
- hashPath.toUri().toString()).groupSplits(false).build());
-
- /**
- * This vertex represents that side of the data that will be streamed and joined
- * against the other side that has been accumulated into a hash table. It reads
- * text data using the TextInputFormat. ForwardingProcessor simply forwards the data
- * downstream as is.
- */
- Vertex streamFileVertex = Vertex.create(streamingSide, ProcessorDescriptor.create(
- ForwardingProcessor.class.getName())).addDataSource(
- inputFile,
- MRInput
- .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
- streamPath.toUri().toString()).groupSplits(false).build());
-
- /**
- * This vertex represents the join operation. It writes the join output as text using
- * the TextOutputFormat. The JoinProcessor is going to perform the join of the
- * streaming side and the hash side. It is load balanced across numPartitions
- */
- Vertex joinVertex = Vertex.create(joiner, ProcessorDescriptor.create(
- JoinProcessor.class.getName()), numPartitions).addDataSink(joinOutput,
- MROutput.createConfigBuilder(new Configuration(tezConf),
- TextOutputFormat.class, outPath.toUri().toString()).build());
-
- /**
- * The streamed side will be partitioned into fragments with the same keys going to
- * the same fragments using hash partitioning. The data to be joined is the key itself
- * and so the value is null. The number of fragments is initially inferred from the
- * number of tasks running in the join vertex because each task will be handling one
- * fragment.
- */
- UnorderedPartitionedKVEdgeConfig streamConf =
- UnorderedPartitionedKVEdgeConfig
- .newBuilder(Text.class.getName(), NullWritable.class.getName(),
- HashPartitioner.class.getName()).setFromConfiguration(tezConf).build();
-
- /**
- * Connect the join vertex with the stream side
- */
- Edge e1 = Edge.create(streamFileVertex, joinVertex, streamConf.createDefaultEdgeProperty());
-
- EdgeProperty hashSideEdgeProperty = null;
- if (doBroadcast) {
- /**
- * This option can be used when the hash side is small. We can broadcast the entire data to
- * all fragments of the stream side. This avoids re-partitioning the fragments of the stream
- * side to match the partitioning scheme of the hash side and avoids costly network data
- * transfer. However, in this example the stream side is being partitioned in both cases for
- * brevity of code. The join task can perform the join of its fragment of keys with all the
- * keys of the hash side.
- * Using an unpartitioned edge to transfer the complete output of the hash side to be
- * broadcasted to all fragments of the streamed side. Again, since the data is the key, the
- * value is null.
- */
- UnorderedKVEdgeConfig broadcastConf = UnorderedKVEdgeConfig.newBuilder(Text.class.getName(),
- NullWritable.class.getName()).setFromConfiguration(tezConf).build();
- hashSideEdgeProperty = broadcastConf.createDefaultBroadcastEdgeProperty();
- } else {
- /**
- * The hash side is also being partitioned into fragments with the same key going to the same
- * fragment using hash partitioning. This way all keys with the same hash value will go to the
- * same fragment from both sides. Thus the join task handling that fragment can join both data
- * set fragments.
- */
- hashSideEdgeProperty = streamConf.createDefaultEdgeProperty();
- }
-
- /**
- * Connect the join vertex to the hash side.
- * The join vertex is connected with 2 upstream vertices that provide it with inputs
- */
- Edge e2 = Edge.create(hashFileVertex, joinVertex, hashSideEdgeProperty);
-
- /**
- * Connect everything up by adding them to the DAG
- */
- dag.addVertex(streamFileVertex).addVertex(hashFileVertex).addVertex(joinVertex)
- .addEdge(e1).addEdge(e2);
- return dag;
- }
-
- /**
- * Reads key-values from the source and forwards the value as the key for the output
- */
- public static class ForwardingProcessor extends SimpleProcessor {
- public ForwardingProcessor(ProcessorContext context) {
- super(context);
- }
-
- @Override
- public void run() throws Exception {
- Preconditions.checkState(getInputs().size() == 1);
- Preconditions.checkState(getOutputs().size() == 1);
- // not looking up inputs and outputs by name because there is just one
- // instance and this processor is used in many different DAGs with
- // different names for inputs and outputs
- LogicalInput input = getInputs().values().iterator().next();
- Reader rawReader = input.getReader();
- Preconditions.checkState(rawReader instanceof KeyValueReader);
- LogicalOutput output = getOutputs().values().iterator().next();
-
- KeyValueReader reader = (KeyValueReader) rawReader;
- KeyValueWriter writer = (KeyValueWriter) output.getWriter();
-
- while (reader.next()) {
- Object val = reader.getCurrentValue();
- // The data value itself is the join key. Simply write it out as the key.
- // The output value is null.
- writer.write(val, NullWritable.get());
- }
- }
- }
-
- public static class JoinProcessor extends SimpleMRProcessor {
-
- public JoinProcessor(ProcessorContext context) {
- super(context);
- }
-
- @Override
- public void run() throws Exception {
- Preconditions.checkState(getInputs().size() == 2);
- Preconditions.checkState(getOutputs().size() == 1);
- // Get the input data for the 2 sides of the join from the 2 inputs
- LogicalInput streamInput = getInputs().get(streamingSide);
- LogicalInput hashInput = getInputs().get(hashSide);
- Reader rawStreamReader = streamInput.getReader();
- Reader rawHashReader = hashInput.getReader();
- Preconditions.checkState(rawStreamReader instanceof KeyValueReader);
- Preconditions.checkState(rawHashReader instanceof KeyValueReader);
- LogicalOutput lo = getOutputs().get(joinOutput);
- Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter);
- KeyValueWriter writer = (KeyValueWriter) lo.getWriter();
-
- // create a hash table for the hash side
- KeyValueReader hashKvReader = (KeyValueReader) rawHashReader;
- Set<Text> keySet = new HashSet<Text>();
- while (hashKvReader.next()) {
- keySet.add(new Text((Text) hashKvReader.getCurrentKey()));
- }
-
- // read the stream side and join it using the hash table
- KeyValueReader streamKvReader = (KeyValueReader) rawStreamReader;
- while (streamKvReader.next()) {
- Text key = (Text) streamKvReader.getCurrentKey();
- if (keySet.contains(key)) {
- writer.write(key, NullWritable.get());
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index e94df09..17e3dc8 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -43,7 +43,7 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.examples.JoinExample.ForwardingProcessor;
+import org.apache.tez.examples.HashJoinExample.ForwardingProcessor;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.Reader;
@@ -57,7 +57,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
public class JoinValidate extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(JoinExample.class);
+ private static final Log LOG = LogFactory.getLog(JoinValidate.class);
private static final String LHS_INPUT_NAME = "lhsfile";
private static final String RHS_INPUT_NAME = "rhsfile";
http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
new file mode 100644
index 0000000..b9ba87a
--- /dev/null
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.examples;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.examples.HashJoinExample.ForwardingProcessor;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Simple example of joining 2 data sets using <a
+ * href="http://en.wikipedia.org/wiki/Sort-merge_join">Sort-Merge Join</a><br>
+ * There're 2 differences between {@link SortMergeJoinExample} and
+ * {@link HashJoinExample}. <li>We always load one data set(hashFile) in memory
+ * in {@link HashJoinExample} which require one dataset(hashFile) must be small
+ * enough to fit into memory, while in {@link SortMergeJoinExample}, it does not
+ * load one data set into memory, it just sort the output of the datasets before
+ * feeding to {@link SortMergeJoinProcessor}, just like the sort phase before
+ * reduce in traditional MapReduce. Then we could move forward the iterators of
+ * two inputs in {@link SortMergeJoinProcessor} to find the joined keys since
+ * they are both sorted already. <br> <li>Because of the sort implemention
+ * difference we describe above, the data requirement is also different for
+ * these 2 sort algorithms. For {@link HashJoinExample} It is required that keys
+ * in the hashFile are unique. while for {@link SortMergeJoinExample} it is
+ * required that keys in the both 2 datasets are unique.
+ */
+public class SortMergeJoinExample extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(SortMergeJoinExample.class);
+
+ private static final String input1 = "input1";
+ private static final String input2 = "input2";
+ private static final String inputFile = "inputFile";
+ private static final String joiner = "joiner";
+ private static final String joinOutput = "joinOutput";
+
+ public static void main(String[] args) throws Exception {
+ SortMergeJoinExample job = new SortMergeJoinExample();
+ int status = ToolRunner.run(new Configuration(), job, args);
+ System.exit(status);
+ }
+
+ private static void printUsage() {
+ System.err.println("Usage: "
+ + "sortmergejoin <file1> <file2> <numPartitions> <outPath>");
+ ToolRunner.printGenericCommandUsage(System.err);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Configuration conf = getConf();
+ String[] otherArgs =
+ new GenericOptionsParser(conf, args).getRemainingArgs();
+ int result = validateArgs(otherArgs);
+ if (result != 0) {
+ return result;
+ }
+ return execute(otherArgs);
+ }
+
+ public int run(Configuration conf, String[] args, TezClient tezClient)
+ throws Exception {
+ setConf(conf);
+ String[] otherArgs =
+ new GenericOptionsParser(conf, args).getRemainingArgs();
+ int result = validateArgs(otherArgs);
+ if (result != 0) {
+ return result;
+ }
+ return execute(otherArgs, tezClient);
+ }
+
+ private int validateArgs(String[] otherArgs) {
+ if (otherArgs.length != 4) {
+ printUsage();
+ return 2;
+ }
+ return 0;
+ }
+
+ private int execute(String[] args) throws TezException, IOException,
+ InterruptedException {
+ TezConfiguration tezConf = new TezConfiguration(getConf());
+ TezClient tezClient = null;
+ try {
+ tezClient = createTezClient(tezConf);
+ return execute(args, tezConf, tezClient);
+ } finally {
+ if (tezClient != null) {
+ tezClient.stop();
+ }
+ }
+ }
+
+ private int execute(String[] args, TezClient tezClient) throws IOException,
+ TezException, InterruptedException {
+ TezConfiguration tezConf = new TezConfiguration(getConf());
+ return execute(args, tezConf, tezClient);
+ }
+
+ private TezClient createTezClient(TezConfiguration tezConf)
+ throws TezException, IOException {
+ TezClient tezClient = TezClient.create("SortMergeJoinExample", tezConf);
+ tezClient.start();
+ return tezClient;
+ }
+
+ private int execute(String[] args, TezConfiguration tezConf,
+ TezClient tezClient) throws IOException, TezException,
+ InterruptedException {
+ LOG.info("Running SortMergeJoinExample");
+
+ UserGroupInformation.setConfiguration(tezConf);
+
+ String inputDir1 = args[0];
+ String inputDir2 = args[1];
+ int numPartitions = Integer.parseInt(args[2]);
+ String outputDir = args[3];
+
+ Path inputPath1 = new Path(inputDir1);
+ Path inputPath2 = new Path(inputDir2);
+ Path outputPath = new Path(outputDir);
+
+ // Verify output path existence
+ FileSystem fs = FileSystem.get(tezConf);
+ if (fs.exists(outputPath)) {
+ System.err.println("Output directory: " + outputDir + " already exists");
+ return 3;
+ }
+ if (numPartitions <= 0) {
+ System.err.println("NumPartitions must be > 0");
+ return 4;
+ }
+
+ DAG dag =
+ createDag(tezConf, inputPath1, inputPath2, outputPath, numPartitions);
+
+ tezClient.waitTillReady();
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
+ if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+ LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+ return -1;
+ }
+ return 0;
+
+ }
+
+ /**
+ * v1 v2 <br>
+ * \ / <br>
+ * v3 <br>
+ *
+ * @param tezConf
+ * @param inputPath1
+ * @param inputPath2
+ * @param outPath
+ * @param numPartitions
+ * @return
+ * @throws IOException
+ */
+ private DAG createDag(TezConfiguration tezConf, Path inputPath1,
+ Path inputPath2, Path outPath, int numPartitions) throws IOException {
+ DAG dag = DAG.create("SortMergeJoinExample");
+
+ /**
+ * This vertex represents the one side of the join. It reads text data using
+ * the TextInputFormat. ForwardingProcessor simply forwards the data
+ * downstream as is.
+ */
+ Vertex inputVertex1 =
+ Vertex.create("input1",
+ ProcessorDescriptor.create(ForwardingProcessor.class.getName()))
+ .addDataSource(
+ inputFile,
+ MRInput
+ .createConfigBuilder(new Configuration(tezConf),
+ TextInputFormat.class, inputPath1.toUri().toString())
+ .groupSplits(false).build());
+
+ /**
+ * The other vertex represents the other side of the join. It reads text
+ * data using the TextInputFormat. ForwardingProcessor simply forwards the
+ * data downstream as is.
+ */
+ Vertex inputVertex2 =
+ Vertex.create("input2",
+ ProcessorDescriptor.create(ForwardingProcessor.class.getName()))
+ .addDataSource(
+ inputFile,
+ MRInput
+ .createConfigBuilder(new Configuration(tezConf),
+ TextInputFormat.class, inputPath2.toUri().toString())
+ .groupSplits(false).build());
+
+ /**
+ * This vertex represents the join operation. It writes the join output as
+ * text using the TextOutputFormat. The JoinProcessor is going to perform
+ * the join of the two sorted output from inputVertex1 and inputVerex2. It
+ * is load balanced across numPartitions.
+ */
+ Vertex joinVertex =
+ Vertex.create(joiner,
+ ProcessorDescriptor.create(SortMergeJoinProcessor.class.getName()),
+ numPartitions).addDataSink(
+ joinOutput,
+ MROutput.createConfigBuilder(new Configuration(tezConf),
+ TextOutputFormat.class, outPath.toUri().toString()).build());
+
+ /**
+ * The output of inputVertex1 and inputVertex2 will be partitioned into
+ * fragments with the same keys going to the same fragments using hash
+ * partitioning. The data to be joined is the key itself and so the value is
+ * null. And these outputs will be sorted before feeding them to
+ * JoinProcessor. The number of fragments is initially inferred from the
+ * number of tasks running in the join vertex because each task will be
+ * handling one fragment.
+ */
+ OrderedPartitionedKVEdgeConfig edgeConf =
+ OrderedPartitionedKVEdgeConfig
+ .newBuilder(Text.class.getName(), NullWritable.class.getName(),
+ HashPartitioner.class.getName()).setFromConfiguration(tezConf)
+ .build();
+
+ /**
+ * Connect the join vertex with inputVertex1 with the EdgeProperty created
+ * from {@link OrderedPartitionedKVEdgeConfig} so that the output of
+ * inputVertex1 is sorted before feeding it to JoinProcessor
+ */
+ Edge e1 =
+ Edge.create(inputVertex1, joinVertex,
+ edgeConf.createDefaultEdgeProperty());
+ /**
+ * Connect the join vertex with inputVertex2 with the EdgeProperty created
+ * from {@link OrderedPartitionedKVEdgeConfig} so that the output of
+ * inputVertex1 is sorted before feeding it to JoinProcessor
+ */
+ Edge e2 =
+ Edge.create(inputVertex2, joinVertex,
+ edgeConf.createDefaultEdgeProperty());
+
+ dag.addVertex(inputVertex1).addVertex(inputVertex2).addVertex(joinVertex)
+ .addEdge(e1).addEdge(e2);
+ return dag;
+ }
+
+ /**
+ * Join 2 inputs which has already been sorted. Check the algorithm here <a
+ * href="http://en.wikipedia.org/wiki/Sort-merge_join">Sort-Merge Join</a><br>
+ * It require the keys in both datasets are unique. <br>
+ * Disclaimer: The join code here is written as a tutorial for the APIs and
+ * not for performance.
+ */
+ public static class SortMergeJoinProcessor extends SimpleMRProcessor {
+
+ public SortMergeJoinProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ Preconditions.checkState(getInputs().size() == 2);
+ Preconditions.checkState(getOutputs().size() == 1);
+ // Get the input data for the 2 sides of the join from the 2 inputs
+ LogicalInput logicalInput1 = getInputs().get(input1);
+ LogicalInput logicalInput2 = getInputs().get(input2);
+ Reader inputReader1 = logicalInput1.getReader();
+ Reader inputReader2 = logicalInput2.getReader();
+ Preconditions.checkState(inputReader1 instanceof KeyValuesReader);
+ Preconditions.checkState(inputReader2 instanceof KeyValuesReader);
+ LogicalOutput lo = getOutputs().get(joinOutput);
+ Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter);
+ KeyValueWriter writer = (KeyValueWriter) lo.getWriter();
+
+ join((KeyValuesReader) inputReader1, (KeyValuesReader) inputReader2,
+ writer);
+ }
+
+ /**
+ * Join 2 sorted inputs both from {@link KeyValuesReader} and write output
+ * using {@link KeyValueWriter}
+ *
+ * @param inputReader1
+ * @param inputReader2
+ * @param writer
+ * @throws IOException
+ */
+ private void join(KeyValuesReader inputReader1,
+ KeyValuesReader inputReader2, KeyValueWriter writer) throws IOException {
+
+ while (inputReader1.next() && inputReader2.next()) {
+ Text value1 = (Text) inputReader1.getCurrentKey();
+ Text value2 = (Text) inputReader2.getCurrentKey();
+ boolean reachEnd = false;
+ // move the cursor of 2 inputs forward until find the same values or one
+ // of them reach the end.
+ while (value1.compareTo(value2) != 0) {
+ if (value1.compareTo(value2) > 0) {
+ if (inputReader2.next()) {
+ value2 = (Text) inputReader2.getCurrentKey();
+ } else {
+ reachEnd = true;
+ break;
+ }
+ } else {
+ if (inputReader1.next()) {
+ value1 = (Text) inputReader1.getCurrentKey();
+ } else {
+ reachEnd = true;
+ break;
+ }
+ }
+ }
+
+ if (reachEnd) {
+ break;
+ } else {
+ writer.write(value1, NullWritable.get());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 1fbacdf..56f62a4 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -52,8 +52,9 @@ import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.examples.OrderedWordCount;
import org.apache.tez.examples.SimpleSessionExample;
import org.apache.tez.examples.JoinDataGen;
-import org.apache.tez.examples.JoinExample;
+import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.JoinValidate;
+import org.apache.tez.examples.SortMergeJoinExample;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -117,13 +118,13 @@ public class TestTezJobs {
}
@Test(timeout = 60000)
- public void testIntersectExample() throws Exception {
- JoinExample intersectExample = new JoinExample();
- intersectExample.setConf(new Configuration(mrrTezCluster.getConfig()));
+ public void testHashJoinExample() throws Exception {
+ HashJoinExample hashJoinExample = new HashJoinExample();
+ hashJoinExample.setConf(new Configuration(mrrTezCluster.getConfig()));
Path stagingDirPath = new Path("/tmp/tez-staging-dir");
- Path inPath1 = new Path("/tmp/inPath1");
- Path inPath2 = new Path("/tmp/inPath2");
- Path outPath = new Path("/tmp/outPath");
+ Path inPath1 = new Path("/tmp/hashJoin/inPath1");
+ Path inPath2 = new Path("/tmp/hashJoin/inPath2");
+ Path outPath = new Path("/tmp/hashJoin/outPath");
remoteFs.mkdirs(inPath1);
remoteFs.mkdirs(inPath2);
remoteFs.mkdirs(stagingDirPath);
@@ -152,7 +153,7 @@ public class TestTezJobs {
String[] args = new String[] {
"-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + stagingDirPath.toString(),
inPath1.toString(), inPath2.toString(), "1", outPath.toString() };
- assertEquals(0, intersectExample.run(args));
+ assertEquals(0, hashJoinExample.run(args));
FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() {
public boolean accept(Path p) {
@@ -172,10 +173,119 @@ public class TestTezJobs {
assertEquals(0, expectedResult.size());
}
+ @Test(timeout = 60000)
+ public void testSortMergeJoinExample() throws Exception {
+ SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample();
+ sortMergeJoinExample.setConf(new Configuration(mrrTezCluster.getConfig()));
+ Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+ Path inPath1 = new Path("/tmp/sortMerge/inPath1");
+ Path inPath2 = new Path("/tmp/sortMerge/inPath2");
+ Path outPath = new Path("/tmp/sortMerge/outPath");
+ remoteFs.mkdirs(inPath1);
+ remoteFs.mkdirs(inPath2);
+ remoteFs.mkdirs(stagingDirPath);
+
+ Set<String> expectedResult = new HashSet<String>();
+
+ FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file"));
+ FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file"));
+ BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter(out1));
+ BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter(out2));
+ for (int i = 0; i < 20; i++) {
+ String term = "term" + i;
+ writer1.write(term);
+ writer1.newLine();
+ if (i % 2 == 0) {
+ writer2.write(term);
+ writer2.newLine();
+ expectedResult.add(term);
+ }
+ }
+ writer1.close();
+ writer2.close();
+ out1.close();
+ out2.close();
+
+ String[] args = new String[] {
+ "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + stagingDirPath.toString(),
+ inPath1.toString(), inPath2.toString(), "1", outPath.toString() };
+ assertEquals(0, sortMergeJoinExample.run(args));
+
+ FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ });
+ assertEquals(1, statuses.length);
+ FSDataInputStream inStream = remoteFs.open(statuses[0].getPath());
+ BufferedReader reader = new BufferedReader(new InputStreamReader(inStream));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ assertTrue(expectedResult.remove(line));
+ }
+ reader.close();
+ inStream.close();
+ assertEquals(0, expectedResult.size());
+ }
+
+ /**
+ * test whole {@link HashJoinExample} pipeline as following: <br>
+ * {@link JoinDataGen} -> {@link HashJoinExample} -> {@link JoinValidate}
+ * @throws Exception
+ */
+ @Test(timeout = 120000)
+ public void testHashJoinExamplePipeline() throws Exception {
+
+ Path testDir = new Path("/tmp/testHashJoinExample");
+ Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+ remoteFs.mkdirs(stagingDirPath);
+ remoteFs.mkdirs(testDir);
+
+ Path dataPath1 = new Path(testDir, "inPath1");
+ Path dataPath2 = new Path(testDir, "inPath2");
+ Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
+ Path outPath = new Path(testDir, "outPath");
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+ TezClient tezSession = null;
+ try {
+ tezSession = TezClient.create("HashJoinExampleSession", tezConf, true);
+ tezSession.start();
+
+ JoinDataGen dataGen = new JoinDataGen();
+ String[] dataGenArgs = new String[] {
+ dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
+ expectedOutputPath.toString(), "2" };
+ assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezSession));
+
+ HashJoinExample joinExample = new HashJoinExample();
+ String[] args = new String[] {
+ dataPath1.toString(), dataPath2.toString(), "2", outPath.toString() };
+ assertEquals(0, joinExample.run(tezConf, args, tezSession));
+
+ JoinValidate joinValidate = new JoinValidate();
+ String[] validateArgs = new String[] {
+ expectedOutputPath.toString(), outPath.toString(), "3" };
+ assertEquals(0, joinValidate.run(tezConf, validateArgs, tezSession));
+
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+ }
+
+ /**
+ * test whole {@link SortMergeJoinExample} pipeline as following: <br>
+ * {@link JoinDataGen} -> {@link SortMergeJoinExample} -> {@link JoinValidate}
+ * @throws Exception
+ */
@Test(timeout = 120000)
- public void testIntersect2() throws Exception {
+ public void testSortMergeJoinExamplePipeline() throws Exception {
- Path testDir = new Path("/tmp/testIntersect2");
+ Path testDir = new Path("/tmp/testSortMergeExample");
Path stagingDirPath = new Path("/tmp/tez-staging-dir");
remoteFs.mkdirs(stagingDirPath);
remoteFs.mkdirs(testDir);
@@ -189,7 +299,7 @@ public class TestTezJobs {
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
TezClient tezSession = null;
try {
- tezSession = TezClient.create("IntersectExampleSession", tezConf);
+ tezSession = TezClient.create("SortMergeExampleSession", tezConf, true);
tezSession.start();
JoinDataGen dataGen = new JoinDataGen();
@@ -198,15 +308,15 @@ public class TestTezJobs {
expectedOutputPath.toString(), "2" };
assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezSession));
- JoinExample intersect = new JoinExample();
- String[] intersectArgs = new String[] {
+ SortMergeJoinExample joinExample = new SortMergeJoinExample();
+ String[] args = new String[] {
dataPath1.toString(), dataPath2.toString(), "2", outPath.toString() };
- assertEquals(0, intersect.run(tezConf, intersectArgs, tezSession));
+ assertEquals(0, joinExample.run(tezConf, args, tezSession));
- JoinValidate intersectValidate = new JoinValidate();
- String[] intersectValidateArgs = new String[] {
+ JoinValidate joinValidate = new JoinValidate();
+ String[] validateArgs = new String[] {
expectedOutputPath.toString(), outPath.toString(), "3" };
- assertEquals(0, intersectValidate.run(tezConf, intersectValidateArgs, tezSession));
+ assertEquals(0, joinValidate.run(tezConf, validateArgs, tezSession));
} finally {
if (tezSession != null) {