You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/22 07:40:58 UTC

git commit: TEZ-1499. Add SortMergeJoinExample to tez-examples (Jeff Zhang via bikas)

Repository: tez
Updated Branches:
  refs/heads/master 55ae1e57c -> 4023898c1


TEZ-1499. Add SortMergeJoinExample to tez-examples (Jeff Zhang via bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4023898c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4023898c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4023898c

Branch: refs/heads/master
Commit: 4023898c109b8e50e71fa08eacdf294b66d162d1
Parents: 55ae1e5
Author: Bikas Saha <bi...@apache.org>
Authored: Sun Sep 21 22:40:47 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Sep 21 22:40:47 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/tez/examples/ExampleDriver.java  |   6 +-
 .../apache/tez/examples/HashJoinExample.java    | 403 +++++++++++++++++++
 .../org/apache/tez/examples/JoinDataGen.java    |   1 -
 .../org/apache/tez/examples/JoinExample.java    | 364 -----------------
 .../org/apache/tez/examples/JoinValidate.java   |   4 +-
 .../tez/examples/SortMergeJoinExample.java      | 374 +++++++++++++++++
 .../java/org/apache/tez/test/TestTezJobs.java   | 144 ++++++-
 8 files changed, 913 insertions(+), 386 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6fe7ff0..407acc9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.0: Unreleased
 INCOMPATIBLE CHANGES
   TEZ-1488. Rename HashComparator to ProxyComparator and implement in TezBytesComparator
   TEZ-1578. Remove TeraSort from Tez codebase.
+  TEZ-1499. Add SortMergeJoinExample to tez-examples
 
 ALL CHANGES:
   TEZ-1544. Link to release artifacts for 0.5.0 does not point to a specific link for 0.5.0.
@@ -14,6 +15,7 @@ ALL CHANGES:
   TEZ-853. Support counters recovery.
   TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized.
   TEZ-1575. MRRSleepJob does not pick MR settings for container size and java opts.
+  TEZ-1488. Rename HashComparator to ProxyComparator and implement in TezBytesComparator
   TEZ-1578. Remove TeraSort from Tez codebase.
   TEZ-1569. Add tests for preemption
   TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
@@ -21,6 +23,7 @@ ALL CHANGES:
   TEZ-1581. GroupByOrderByMRRTest no longer functional.
   TEZ-1157. Optimize broadcast shuffle to download data only once per host. 
   TEZ-1607. support mr envs in mrrsleep and testorderedwordcount
+  TEZ-1499. Add SortMergeJoinExample to tez-examples
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java b/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java
index 6f689e2..5394cc1 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java
@@ -50,8 +50,10 @@ public class ExampleDriver {
           "Word Count with words sorted on frequency");
       pgd.addClass("simplesessionexample", SimpleSessionExample.class,
           "Example to run multiple dags in a session");
-      pgd.addClass("joinexample", JoinExample.class,
-          "Identify all occurences of lines in file1 which also occur in file2");
+      pgd.addClass("hashjoin", HashJoinExample.class,
+          "Identify all occurences of lines in file1 which also occur in file2 using hash join");
+      pgd.addClass("sortmergejoin", SortMergeJoinExample.class,
+          "Identify all occurences of lines in file1 which also occur in file2 using sort merge join");
       pgd.addClass("joindatagen", JoinDataGen.class,
           "Generate data to run the joinexample");
       pgd.addClass("joinvalidate", JoinValidate.class,

http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
new file mode 100644
index 0000000..76e53e1
--- /dev/null
+++ b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.examples;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Simple example of joining 2 data sets using <a
+ * href="http://en.wikipedia.org/wiki/Hash_join">Hash Join</a>.<br>
+ * The example shows a vertex with multiple inputs that represent the two data
+ * sets that need to be joined. This HashJoinExample assume that keys in the
+ * second dataset (hashSide) is unique.<br>
+ * The join can be performed using a broadcast (or replicate-fragment) join in
+ * which the small side of the join is broadcast in total to fragments of the
+ * larger side. Each fragment of the larger side can perform the join operation
+ * independently using the full data of the smaller side. This shows the usage
+ * of the broadcast edge property in Tez. <br>
+ * The join can be performed using the regular repartition join where both sides
+ * are partitioned according to the same scheme into the same number of
+ * fragments. Then the keys in the same fragment are joined with each other.
+ * This is the default join strategy.
+ */
+public class HashJoinExample extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(HashJoinExample.class);
+
+  private static final String broadcastOption = "doBroadcast";
+  private static final String streamingSide = "streamingSide";
+  private static final String hashSide = "hashSide";
+  private static final String inputFile = "inputFile";
+  private static final String joiner = "joiner";
+  private static final String joinOutput = "joinOutput";
+
+  public static void main(String[] args) throws Exception {
+    HashJoinExample job = new HashJoinExample();
+    int status = ToolRunner.run(new Configuration(), job, args);
+    System.exit(status);
+  }
+
+  private static void printUsage() {
+    System.err.println("Usage: "
+        + "hashjoin <file1> <file2> <numPartitions> <outPath> ["
+        + broadcastOption + "(default false)]");
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    String[] otherArgs =
+        new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs);
+  }
+
+  public int run(Configuration conf, String[] args, TezClient tezClient)
+      throws Exception {
+    setConf(conf);
+    String[] otherArgs =
+        new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs, tezClient);
+  }
+
+  private int validateArgs(String[] otherArgs) {
+    if (!(otherArgs.length == 4 || otherArgs.length == 5)) {
+      printUsage();
+      return 2;
+    }
+    return 0;
+  }
+
+  private int execute(String[] args) throws TezException, IOException,
+      InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    TezClient tezClient = null;
+    try {
+      tezClient = createTezClient(tezConf);
+      return execute(args, tezConf, tezClient);
+    } finally {
+      if (tezClient != null) {
+        tezClient.stop();
+      }
+    }
+  }
+
+  private int execute(String[] args, TezClient tezClient) throws IOException,
+      TezException, InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    return execute(args, tezConf, tezClient);
+  }
+
+  private TezClient createTezClient(TezConfiguration tezConf)
+      throws TezException, IOException {
+    TezClient tezClient = TezClient.create("HashJoinExample", tezConf);
+    tezClient.start();
+    return tezClient;
+  }
+
+  private int execute(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws IOException, TezException,
+      InterruptedException {
+    boolean doBroadcast =
+        args.length == 5 && args[4].equals(broadcastOption) ? true : false;
+    LOG.info("Running HashJoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
+
+    UserGroupInformation.setConfiguration(tezConf);
+
+    String streamInputDir = args[0];
+    String hashInputDir = args[1];
+    int numPartitions = Integer.parseInt(args[2]);
+    String outputDir = args[3];
+
+    Path streamInputPath = new Path(streamInputDir);
+    Path hashInputPath = new Path(hashInputDir);
+    Path outputPath = new Path(outputDir);
+
+    // Verify output path existence
+    FileSystem fs = FileSystem.get(tezConf);
+    if (fs.exists(outputPath)) {
+      System.err.println("Output directory: " + outputDir + " already exists");
+      return 3;
+    }
+    if (numPartitions <= 0) {
+      System.err.println("NumPartitions must be > 0");
+      return 4;
+    }
+
+    DAG dag =
+        createDag(tezConf, streamInputPath, hashInputPath, outputPath,
+            numPartitions, doBroadcast);
+
+    tezClient.waitTillReady();
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
+    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+      return -1;
+    }
+    return 0;
+
+  }
+
+  private DAG createDag(TezConfiguration tezConf, Path streamPath,
+      Path hashPath, Path outPath, int numPartitions, boolean doBroadcast)
+      throws IOException {
+    DAG dag = DAG.create("HashJoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
+
+    /**
+     * This vertex represents the side of the join that will be accumulated in a
+     * hash table in order to join it against the other side. It reads text data
+     * using the TextInputFormat. ForwardingProcessor simply forwards the data
+     * downstream as is.
+     */
+    Vertex hashFileVertex =
+        Vertex.create(hashSide,
+            ProcessorDescriptor.create(ForwardingProcessor.class.getName()))
+            .addDataSource(
+                inputFile,
+                MRInput
+                    .createConfigBuilder(new Configuration(tezConf),
+                        TextInputFormat.class, hashPath.toUri().toString())
+                    .groupSplits(false).build());
+
+    /**
+     * This vertex represents that side of the data that will be streamed and
+     * joined against the other side that has been accumulated into a hash
+     * table. It reads text data using the TextInputFormat. ForwardingProcessor
+     * simply forwards the data downstream as is.
+     */
+    Vertex streamFileVertex =
+        Vertex.create(streamingSide,
+            ProcessorDescriptor.create(ForwardingProcessor.class.getName()))
+            .addDataSource(
+                inputFile,
+                MRInput
+                    .createConfigBuilder(new Configuration(tezConf),
+                        TextInputFormat.class, streamPath.toUri().toString())
+                    .groupSplits(false).build());
+
+    /**
+     * This vertex represents the join operation. It writes the join output as
+     * text using the TextOutputFormat. The JoinProcessor is going to perform
+     * the join of the streaming side and the hash side. It is load balanced
+     * across numPartitions
+     */
+    Vertex joinVertex =
+        Vertex.create(joiner,
+            ProcessorDescriptor.create(HashJoinProcessor.class.getName()),
+            numPartitions).addDataSink(
+            joinOutput,
+            MROutput.createConfigBuilder(new Configuration(tezConf),
+                TextOutputFormat.class, outPath.toUri().toString()).build());
+
+    /**
+     * The streamed side will be partitioned into fragments with the same keys
+     * going to the same fragments using hash partitioning. The data to be
+     * joined is the key itself and so the value is null. The number of
+     * fragments is initially inferred from the number of tasks running in the
+     * join vertex because each task will be handling one fragment.
+     */
+    UnorderedPartitionedKVEdgeConfig streamConf =
+        UnorderedPartitionedKVEdgeConfig
+            .newBuilder(Text.class.getName(), NullWritable.class.getName(),
+                HashPartitioner.class.getName()).setFromConfiguration(tezConf)
+            .build();
+
+    /**
+     * Connect the join vertex with the stream side
+     */
+    Edge e1 =
+        Edge.create(streamFileVertex, joinVertex,
+            streamConf.createDefaultEdgeProperty());
+
+    EdgeProperty hashSideEdgeProperty = null;
+    if (doBroadcast) {
+      /**
+       * This option can be used when the hash side is small. We can broadcast
+       * the entire data to all fragments of the stream side. This avoids
+       * re-partitioning the fragments of the stream side to match the
+       * partitioning scheme of the hash side and avoids costly network data
+       * transfer. However, in this example the stream side is being partitioned
+       * in both cases for brevity of code. The join task can perform the join
+       * of its fragment of keys with all the keys of the hash side. Using an
+       * unpartitioned edge to transfer the complete output of the hash side to
+       * be broadcasted to all fragments of the streamed side. Again, since the
+       * data is the key, the value is null.
+       */
+      UnorderedKVEdgeConfig broadcastConf =
+          UnorderedKVEdgeConfig
+              .newBuilder(Text.class.getName(), NullWritable.class.getName())
+              .setFromConfiguration(tezConf).build();
+      hashSideEdgeProperty = broadcastConf.createDefaultBroadcastEdgeProperty();
+    } else {
+      /**
+       * The hash side is also being partitioned into fragments with the same
+       * key going to the same fragment using hash partitioning. This way all
+       * keys with the same hash value will go to the same fragment from both
+       * sides. Thus the join task handling that fragment can join both data set
+       * fragments.
+       */
+      hashSideEdgeProperty = streamConf.createDefaultEdgeProperty();
+    }
+
+    /**
+     * Connect the join vertex to the hash side. The join vertex is connected
+     * with 2 upstream vertices that provide it with inputs
+     */
+    Edge e2 = Edge.create(hashFileVertex, joinVertex, hashSideEdgeProperty);
+
+    /**
+     * Connect everything up by adding them to the DAG
+     */
+    dag.addVertex(streamFileVertex).addVertex(hashFileVertex)
+        .addVertex(joinVertex).addEdge(e1).addEdge(e2);
+    return dag;
+  }
+
+  /**
+   * Reads key-values from the source and forwards the value as the key for the
+   * output
+   */
+  public static class ForwardingProcessor extends SimpleProcessor {
+    public ForwardingProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkState(getInputs().size() == 1);
+      Preconditions.checkState(getOutputs().size() == 1);
+      // not looking up inputs and outputs by name because there is just one
+      // instance and this processor is used in many different DAGs with
+      // different names for inputs and outputs
+      LogicalInput input = getInputs().values().iterator().next();
+      Reader rawReader = input.getReader();
+      Preconditions.checkState(rawReader instanceof KeyValueReader);
+      LogicalOutput output = getOutputs().values().iterator().next();
+
+      KeyValueReader reader = (KeyValueReader) rawReader;
+      KeyValueWriter writer = (KeyValueWriter) output.getWriter();
+
+      while (reader.next()) {
+        Object val = reader.getCurrentValue();
+        // The data value itself is the join key. Simply write it out as the
+        // key.
+        // The output value is null.
+        writer.write(val, NullWritable.get());
+      }
+    }
+  }
+
+  /**
+   * Join 2 inputs using Hash Join algorithm. Check the algorithm here <a
+   * href="http://en.wikipedia.org/wiki/Hash_join">Hash Join</a> <br>
+   * It would output all the occurrences keys in the streamFile which also exist
+   * in the hashFile. This require the keys in hashFile should be unique
+   * <br>Disclaimer: The join code here is written as a tutorial for the APIs and
+   * not for performance.
+   */
+  public static class HashJoinProcessor extends SimpleMRProcessor {
+
+    public HashJoinProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkState(getInputs().size() == 2);
+      Preconditions.checkState(getOutputs().size() == 1);
+      // Get the input data for the 2 sides of the join from the 2 inputs
+      LogicalInput streamInput = getInputs().get(streamingSide);
+      LogicalInput hashInput = getInputs().get(hashSide);
+      Reader rawStreamReader = streamInput.getReader();
+      Reader rawHashReader = hashInput.getReader();
+      Preconditions.checkState(rawStreamReader instanceof KeyValueReader);
+      Preconditions.checkState(rawHashReader instanceof KeyValueReader);
+      LogicalOutput lo = getOutputs().get(joinOutput);
+      Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter);
+      KeyValueWriter writer = (KeyValueWriter) lo.getWriter();
+
+      // create a hash table for the hash side
+      KeyValueReader hashKvReader = (KeyValueReader) rawHashReader;
+      Set<Text> keySet = new HashSet<Text>();
+      while (hashKvReader.next()) {
+        keySet.add(new Text((Text) hashKvReader.getCurrentKey()));
+      }
+
+      // read the stream side and join it using the hash table
+      KeyValueReader streamKvReader = (KeyValueReader) rawStreamReader;
+      while (streamKvReader.next()) {
+        Text key = (Text) streamKvReader.getCurrentKey();
+        if (keySet.contains(key)) {
+          writer.write(key, NullWritable.get());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
index 8231b6f..ff73247 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
@@ -73,7 +73,6 @@ public class JoinDataGen extends Configured implements Tool {
     System.err
         .println("Usage: "
             + "joindatagen <outPath1> <path1Size> <outPath2> <path2Size> <expectedResultPath> <numTasks>");
-    ;
     ToolRunner.printGenericCommandUsage(System.err);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/JoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinExample.java
deleted file mode 100644
index 3611fd6..0000000
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinExample.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.examples;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.input.MRInput;
-import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
-import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.runtime.library.processor.SimpleProcessor;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Simple example of joining 2 data sets.
- * <br>The example shows a vertex with multiple inputs that represent the two
- * data sets that need to be joined.
- * <br>The join can be performed using a broadcast (or replicate-fragment) join in 
- * which the small side of the join is broadcast in total to fragments of the 
- * larger side. Each fragment of the larger side can perform the join operation
- * independently using the full data of the smaller side. This shows the usage 
- * of the broadcast edge property in Tez.
- * <br>The join can be performed using the regular repartition join where both 
- * sides are partitioned according to the same scheme into the same number of 
- * fragments. Then the keys in the same fragment are joined with each other. This 
- * is the default join strategy.
- *
- */
-public class JoinExample extends Configured implements Tool {
-
-  private static final Log LOG = LogFactory.getLog(JoinExample.class);
-  
-  private static final String broadcastOption = "doBroadcast";
-  private static final String streamingSide = "streamingSide";
-  private static final String hashSide = "hashSide";
-  private static final String inputFile = "inputFile";
-  private static final String joiner = "joiner";
-  private static final String joinOutput = "joinOutput";
-  
-
-  public static void main(String[] args) throws Exception {
-    JoinExample job = new JoinExample();
-    int status = ToolRunner.run(new Configuration(), job, args);
-    System.exit(status);
-  }
-
-  private static void printUsage() {
-    System.err.println("Usage: " + "joinexample <file1> <file2> <numPartitions> <outPath> [" 
-                       + broadcastOption + "(default false)]");
-    ToolRunner.printGenericCommandUsage(System.err);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs);
-  }
-  
-  public int run(Configuration conf, String[] args, TezClient tezClient) throws Exception {
-    setConf(conf);
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs, tezClient);
-  }
-  
-  private int validateArgs(String[] otherArgs) {
-    if (!(otherArgs.length == 4 || otherArgs.length == 5)) {
-      printUsage();
-      return 2;
-    }
-    return 0;
-  }
-
-  private int execute(String[] args) throws TezException, IOException, InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    TezClient tezClient = null;
-    try {
-      tezClient = createTezClient(tezConf);
-      return execute(args, tezConf, tezClient);
-    } finally {
-      if (tezClient != null) {
-        tezClient.stop();
-      }
-    }
-  }
-  
-  private int execute(String[] args, TezClient tezClient) throws IOException, TezException,
-      InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    return execute(args, tezConf, tezClient);
-  }
-
-  private TezClient createTezClient(TezConfiguration tezConf) throws TezException, IOException {
-    TezClient tezClient = TezClient.create("JoinExample", tezConf);
-    tezClient.start();
-    return tezClient;
-  }
-  
-  private int execute(String[] args, TezConfiguration tezConf, TezClient tezClient)
-      throws IOException, TezException, InterruptedException {
-    boolean doBroadcast = args.length == 5 && args[4].equals(broadcastOption) ? true : false;
-    LOG.info("Running JoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
-
-    UserGroupInformation.setConfiguration(tezConf);
-
-    String streamInputDir = args[0];
-    String hashInputDir = args[1];
-    int numPartitions = Integer.parseInt(args[2]);
-    String outputDir = args[3];
-
-    Path streamInputPath = new Path(streamInputDir);
-    Path hashInputPath = new Path(hashInputDir);
-    Path outputPath = new Path(outputDir);
-
-    // Verify output path existence
-    FileSystem fs = FileSystem.get(tezConf);
-    if (fs.exists(outputPath)) {
-      System.err.println("Output directory: " + outputDir + " already exists");
-      return 3;
-    }
-    if (numPartitions <= 0) {
-      System.err.println("NumPartitions must be > 0");
-      return 4;
-    }
-
-    DAG dag = createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions, doBroadcast);
-
-    tezClient.waitTillReady();
-    DAGClient dagClient = tezClient.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
-    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
-      return -1;
-    }
-    return 0;
-
-  }
-
-  private DAG createDag(TezConfiguration tezConf, Path streamPath, Path hashPath, Path outPath,
-      int numPartitions, boolean doBroadcast) throws IOException {
-    DAG dag = DAG.create("JoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
-
-    /**
-     * This vertex represents the side of the join that will be accumulated in a hash 
-     * table in order to join it against the other side. It reads text data using the
-     * TextInputFormat. ForwardingProcessor simply forwards the data downstream as is.
-     */
-    Vertex hashFileVertex = Vertex.create(hashSide, ProcessorDescriptor.create(
-        ForwardingProcessor.class.getName())).addDataSource(
-        inputFile,
-        MRInput
-            .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
-                hashPath.toUri().toString()).groupSplits(false).build());
-
-    /**
-     * This vertex represents that side of the data that will be streamed and joined 
-     * against the other side that has been accumulated into a hash table. It reads 
-     * text data using the TextInputFormat. ForwardingProcessor simply forwards the data 
-     * downstream as is.
-     */
-    Vertex streamFileVertex = Vertex.create(streamingSide, ProcessorDescriptor.create(
-        ForwardingProcessor.class.getName())).addDataSource(
-        inputFile,
-        MRInput
-            .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
-                streamPath.toUri().toString()).groupSplits(false).build());
-
-    /**
-     * This vertex represents the join operation. It writes the join output as text using
-     * the TextOutputFormat. The JoinProcessor is going to perform the join of the 
-     * streaming side and the hash side. It is load balanced across numPartitions 
-     */
-    Vertex joinVertex = Vertex.create(joiner, ProcessorDescriptor.create(
-        JoinProcessor.class.getName()), numPartitions).addDataSink(joinOutput,
-        MROutput.createConfigBuilder(new Configuration(tezConf),
-            TextOutputFormat.class, outPath.toUri().toString()).build());
-
-    /**
-     * The streamed side will be partitioned into fragments with the same keys going to 
-     * the same fragments using hash partitioning. The data to be joined is the key itself
-     * and so the value is null. The number of fragments is initially inferred from the 
-     * number of tasks running in the join vertex because each task will be handling one
-     * fragment.
-     */
-    UnorderedPartitionedKVEdgeConfig streamConf =
-        UnorderedPartitionedKVEdgeConfig
-            .newBuilder(Text.class.getName(), NullWritable.class.getName(),
-                HashPartitioner.class.getName()).setFromConfiguration(tezConf).build();
-
-    /**
-     * Connect the join vertex with the stream side
-     */
-    Edge e1 = Edge.create(streamFileVertex, joinVertex, streamConf.createDefaultEdgeProperty());
-    
-    EdgeProperty hashSideEdgeProperty = null;
-    if (doBroadcast) {
-      /**
-       * This option can be used when the hash side is small. We can broadcast the entire data to 
-       * all fragments of the stream side. This avoids re-partitioning the fragments of the stream 
-       * side to match the partitioning scheme of the hash side and avoids costly network data 
-       * transfer. However, in this example the stream side is being partitioned in both cases for 
-       * brevity of code. The join task can perform the join of its fragment of keys with all the 
-       * keys of the hash side.
-       * Using an unpartitioned edge to transfer the complete output of the hash side to be 
-       * broadcasted to all fragments of the streamed side. Again, since the data is the key, the 
-       * value is null.
-       */
-      UnorderedKVEdgeConfig broadcastConf = UnorderedKVEdgeConfig.newBuilder(Text.class.getName(),
-          NullWritable.class.getName()).setFromConfiguration(tezConf).build();
-      hashSideEdgeProperty = broadcastConf.createDefaultBroadcastEdgeProperty();
-    } else {
-      /**
-       * The hash side is also being partitioned into fragments with the same key going to the same
-       * fragment using hash partitioning. This way all keys with the same hash value will go to the
-       * same fragment from both sides. Thus the join task handling that fragment can join both data
-       * set fragments. 
-       */
-      hashSideEdgeProperty = streamConf.createDefaultEdgeProperty();
-    }
-
-    /**
-     * Connect the join vertex to the hash side.
-     * The join vertex is connected with 2 upstream vertices that provide it with inputs
-     */
-    Edge e2 = Edge.create(hashFileVertex, joinVertex, hashSideEdgeProperty);
-
-    /**
-     * Connect everything up by adding them to the DAG
-     */
-    dag.addVertex(streamFileVertex).addVertex(hashFileVertex).addVertex(joinVertex)
-        .addEdge(e1).addEdge(e2);
-    return dag;
-  }
-
-  /**
-   * Reads key-values from the source and forwards the value as the key for the output
-   */
-  public static class ForwardingProcessor extends SimpleProcessor {
-    public ForwardingProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void run() throws Exception {
-      Preconditions.checkState(getInputs().size() == 1);
-      Preconditions.checkState(getOutputs().size() == 1);
-      // not looking up inputs and outputs by name because there is just one
-      // instance and this processor is used in many different DAGs with 
-      // different names for inputs and outputs
-      LogicalInput input = getInputs().values().iterator().next();
-      Reader rawReader = input.getReader();
-      Preconditions.checkState(rawReader instanceof KeyValueReader);
-      LogicalOutput output = getOutputs().values().iterator().next();
-
-      KeyValueReader reader = (KeyValueReader) rawReader;
-      KeyValueWriter writer = (KeyValueWriter) output.getWriter();
-
-      while (reader.next()) {
-        Object val = reader.getCurrentValue();
-        // The data value itself is the join key. Simply write it out as the key.
-        // The output value is null.
-        writer.write(val, NullWritable.get());
-      }
-    }
-  }
-
-  public static class JoinProcessor extends SimpleMRProcessor {
-
-    public JoinProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void run() throws Exception {
-      Preconditions.checkState(getInputs().size() == 2);
-      Preconditions.checkState(getOutputs().size() == 1);
-      // Get the input data for the 2 sides of the join from the 2 inputs
-      LogicalInput streamInput = getInputs().get(streamingSide);
-      LogicalInput hashInput = getInputs().get(hashSide);
-      Reader rawStreamReader = streamInput.getReader();
-      Reader rawHashReader = hashInput.getReader();
-      Preconditions.checkState(rawStreamReader instanceof KeyValueReader);
-      Preconditions.checkState(rawHashReader instanceof KeyValueReader);
-      LogicalOutput lo = getOutputs().get(joinOutput);
-      Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter);
-      KeyValueWriter writer = (KeyValueWriter) lo.getWriter();
-
-      // create a hash table for the hash side
-      KeyValueReader hashKvReader = (KeyValueReader) rawHashReader;
-      Set<Text> keySet = new HashSet<Text>();
-      while (hashKvReader.next()) {
-        keySet.add(new Text((Text) hashKvReader.getCurrentKey()));
-      }
-
-      // read the stream side and join it using the hash table
-      KeyValueReader streamKvReader = (KeyValueReader) rawStreamReader;
-      while (streamKvReader.next()) {
-        Text key = (Text) streamKvReader.getCurrentKey();
-        if (keySet.contains(key)) {
-          writer.write(key, NullWritable.get());
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index e94df09..17e3dc8 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -43,7 +43,7 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.examples.JoinExample.ForwardingProcessor;
+import org.apache.tez.examples.HashJoinExample.ForwardingProcessor;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.Reader;
@@ -57,7 +57,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 
 public class JoinValidate extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(JoinExample.class);
+  private static final Log LOG = LogFactory.getLog(JoinValidate.class);
 
   private static final String LHS_INPUT_NAME = "lhsfile";
   private static final String RHS_INPUT_NAME = "rhsfile";

http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
new file mode 100644
index 0000000..b9ba87a
--- /dev/null
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.examples;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.examples.HashJoinExample.ForwardingProcessor;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Simple example of joining 2 data sets using <a
+ * href="http://en.wikipedia.org/wiki/Sort-merge_join">Sort-Merge Join</a><br>
+ * There're 2 differences between {@link SortMergeJoinExample} and
+ * {@link HashJoinExample}. <li>We always load one data set(hashFile) in memory
+ * in {@link HashJoinExample} which require one dataset(hashFile) must be small
+ * enough to fit into memory, while in {@link SortMergeJoinExample}, it does not
+ * load one data set into memory, it just sort the output of the datasets before
+ * feeding to {@link SortMergeJoinProcessor}, just like the sort phase before
+ * reduce in traditional MapReduce. Then we could move forward the iterators of
+ * two inputs in {@link SortMergeJoinProcessor} to find the joined keys since
+ * they are both sorted already. <br> <li>Because of the sort implemention
+ * difference we describe above, the data requirement is also different for
+ * these 2 sort algorithms. For {@link HashJoinExample} It is required that keys
+ * in the hashFile are unique. while for {@link SortMergeJoinExample} it is
+ * required that keys in the both 2 datasets are unique.
+ */
+public class SortMergeJoinExample extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(SortMergeJoinExample.class);
+
+  private static final String input1 = "input1";
+  private static final String input2 = "input2";
+  private static final String inputFile = "inputFile";
+  private static final String joiner = "joiner";
+  private static final String joinOutput = "joinOutput";
+
+  public static void main(String[] args) throws Exception {
+    SortMergeJoinExample job = new SortMergeJoinExample();
+    int status = ToolRunner.run(new Configuration(), job, args);
+    System.exit(status);
+  }
+
+  private static void printUsage() {
+    System.err.println("Usage: "
+        + "sortmergejoin <file1> <file2> <numPartitions> <outPath>");
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    String[] otherArgs =
+        new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs);
+  }
+
+  public int run(Configuration conf, String[] args, TezClient tezClient)
+      throws Exception {
+    setConf(conf);
+    String[] otherArgs =
+        new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs, tezClient);
+  }
+
+  private int validateArgs(String[] otherArgs) {
+    if (otherArgs.length != 4) {
+      printUsage();
+      return 2;
+    }
+    return 0;
+  }
+
+  private int execute(String[] args) throws TezException, IOException,
+      InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    TezClient tezClient = null;
+    try {
+      tezClient = createTezClient(tezConf);
+      return execute(args, tezConf, tezClient);
+    } finally {
+      if (tezClient != null) {
+        tezClient.stop();
+      }
+    }
+  }
+
+  private int execute(String[] args, TezClient tezClient) throws IOException,
+      TezException, InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    return execute(args, tezConf, tezClient);
+  }
+
+  private TezClient createTezClient(TezConfiguration tezConf)
+      throws TezException, IOException {
+    TezClient tezClient = TezClient.create("SortMergeJoinExample", tezConf);
+    tezClient.start();
+    return tezClient;
+  }
+
+  private int execute(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws IOException, TezException,
+      InterruptedException {
+    LOG.info("Running SortMergeJoinExample");
+
+    UserGroupInformation.setConfiguration(tezConf);
+
+    String inputDir1 = args[0];
+    String inputDir2 = args[1];
+    int numPartitions = Integer.parseInt(args[2]);
+    String outputDir = args[3];
+
+    Path inputPath1 = new Path(inputDir1);
+    Path inputPath2 = new Path(inputDir2);
+    Path outputPath = new Path(outputDir);
+
+    // Verify output path existence
+    FileSystem fs = FileSystem.get(tezConf);
+    if (fs.exists(outputPath)) {
+      System.err.println("Output directory: " + outputDir + " already exists");
+      return 3;
+    }
+    if (numPartitions <= 0) {
+      System.err.println("NumPartitions must be > 0");
+      return 4;
+    }
+
+    DAG dag =
+        createDag(tezConf, inputPath1, inputPath2, outputPath, numPartitions);
+
+    tezClient.waitTillReady();
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
+    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+      return -1;
+    }
+    return 0;
+
+  }
+
+  /**
+   * v1 v2 <br>
+   * &nbsp;\&nbsp;/ <br>
+   * &nbsp;&nbsp;v3 <br>
+   * 
+   * @param tezConf
+   * @param inputPath1
+   * @param inputPath2
+   * @param outPath
+   * @param numPartitions
+   * @return
+   * @throws IOException
+   */
+  private DAG createDag(TezConfiguration tezConf, Path inputPath1,
+      Path inputPath2, Path outPath, int numPartitions) throws IOException {
+    DAG dag = DAG.create("SortMergeJoinExample");
+
+    /**
+     * This vertex represents the one side of the join. It reads text data using
+     * the TextInputFormat. ForwardingProcessor simply forwards the data
+     * downstream as is.
+     */
+    Vertex inputVertex1 =
+        Vertex.create("input1",
+            ProcessorDescriptor.create(ForwardingProcessor.class.getName()))
+            .addDataSource(
+                inputFile,
+                MRInput
+                    .createConfigBuilder(new Configuration(tezConf),
+                        TextInputFormat.class, inputPath1.toUri().toString())
+                    .groupSplits(false).build());
+
+    /**
+     * The other vertex represents the other side of the join. It reads text
+     * data using the TextInputFormat. ForwardingProcessor simply forwards the
+     * data downstream as is.
+     */
+    Vertex inputVertex2 =
+        Vertex.create("input2",
+            ProcessorDescriptor.create(ForwardingProcessor.class.getName()))
+            .addDataSource(
+                inputFile,
+                MRInput
+                    .createConfigBuilder(new Configuration(tezConf),
+                        TextInputFormat.class, inputPath2.toUri().toString())
+                    .groupSplits(false).build());
+
+    /**
+     * This vertex represents the join operation. It writes the join output as
+     * text using the TextOutputFormat. The JoinProcessor is going to perform
+     * the join of the two sorted output from inputVertex1 and inputVerex2. It
+     * is load balanced across numPartitions.
+     */
+    Vertex joinVertex =
+        Vertex.create(joiner,
+            ProcessorDescriptor.create(SortMergeJoinProcessor.class.getName()),
+            numPartitions).addDataSink(
+            joinOutput,
+            MROutput.createConfigBuilder(new Configuration(tezConf),
+                TextOutputFormat.class, outPath.toUri().toString()).build());
+
+    /**
+     * The output of inputVertex1 and inputVertex2 will be partitioned into
+     * fragments with the same keys going to the same fragments using hash
+     * partitioning. The data to be joined is the key itself and so the value is
+     * null. And these outputs will be sorted before feeding them to
+     * JoinProcessor. The number of fragments is initially inferred from the
+     * number of tasks running in the join vertex because each task will be
+     * handling one fragment.
+     */
+    OrderedPartitionedKVEdgeConfig edgeConf =
+        OrderedPartitionedKVEdgeConfig
+            .newBuilder(Text.class.getName(), NullWritable.class.getName(),
+                HashPartitioner.class.getName()).setFromConfiguration(tezConf)
+            .build();
+
+    /**
+     * Connect the join vertex with inputVertex1 with the EdgeProperty created
+     * from {@link OrderedPartitionedKVEdgeConfig} so that the output of
+     * inputVertex1 is sorted before feeding it to JoinProcessor
+     */
+    Edge e1 =
+        Edge.create(inputVertex1, joinVertex,
+            edgeConf.createDefaultEdgeProperty());
+    /**
+     * Connect the join vertex with inputVertex2 with the EdgeProperty created
+     * from {@link OrderedPartitionedKVEdgeConfig} so that the output of
+     * inputVertex1 is sorted before feeding it to JoinProcessor
+     */
+    Edge e2 =
+        Edge.create(inputVertex2, joinVertex,
+            edgeConf.createDefaultEdgeProperty());
+
+    dag.addVertex(inputVertex1).addVertex(inputVertex2).addVertex(joinVertex)
+        .addEdge(e1).addEdge(e2);
+    return dag;
+  }
+
+  /**
+   * Join 2 inputs which has already been sorted. Check the algorithm here <a
+   * href="http://en.wikipedia.org/wiki/Sort-merge_join">Sort-Merge Join</a><br>
+   * It require the keys in both datasets are unique. <br>
+   * Disclaimer: The join code here is written as a tutorial for the APIs and
+   * not for performance.
+   */
+  public static class SortMergeJoinProcessor extends SimpleMRProcessor {
+
+    public SortMergeJoinProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkState(getInputs().size() == 2);
+      Preconditions.checkState(getOutputs().size() == 1);
+      // Get the input data for the 2 sides of the join from the 2 inputs
+      LogicalInput logicalInput1 = getInputs().get(input1);
+      LogicalInput logicalInput2 = getInputs().get(input2);
+      Reader inputReader1 = logicalInput1.getReader();
+      Reader inputReader2 = logicalInput2.getReader();
+      Preconditions.checkState(inputReader1 instanceof KeyValuesReader);
+      Preconditions.checkState(inputReader2 instanceof KeyValuesReader);
+      LogicalOutput lo = getOutputs().get(joinOutput);
+      Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter);
+      KeyValueWriter writer = (KeyValueWriter) lo.getWriter();
+
+      join((KeyValuesReader) inputReader1, (KeyValuesReader) inputReader2,
+          writer);
+    }
+
+    /**
+     * Join 2 sorted inputs both from {@link KeyValuesReader} and write output
+     * using {@link KeyValueWriter}
+     * 
+     * @param inputReader1
+     * @param inputReader2
+     * @param writer
+     * @throws IOException
+     */
+    private void join(KeyValuesReader inputReader1,
+        KeyValuesReader inputReader2, KeyValueWriter writer) throws IOException {
+
+      while (inputReader1.next() && inputReader2.next()) {
+        Text value1 = (Text) inputReader1.getCurrentKey();
+        Text value2 = (Text) inputReader2.getCurrentKey();
+        boolean reachEnd = false;
+        // move the cursor of 2 inputs forward until find the same values or one
+        // of them reach the end.
+        while (value1.compareTo(value2) != 0) {
+          if (value1.compareTo(value2) > 0) {
+            if (inputReader2.next()) {
+              value2 = (Text) inputReader2.getCurrentKey();
+            } else {
+              reachEnd = true;
+              break;
+            }
+          } else {
+            if (inputReader1.next()) {
+              value1 = (Text) inputReader1.getCurrentKey();
+            } else {
+              reachEnd = true;
+              break;
+            }
+          }
+        }
+
+        if (reachEnd) {
+          break;
+        } else {
+          writer.write(value1, NullWritable.get());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4023898c/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 1fbacdf..56f62a4 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -52,8 +52,9 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.examples.OrderedWordCount;
 import org.apache.tez.examples.SimpleSessionExample;
 import org.apache.tez.examples.JoinDataGen;
-import org.apache.tez.examples.JoinExample;
+import org.apache.tez.examples.HashJoinExample;
 import org.apache.tez.examples.JoinValidate;
+import org.apache.tez.examples.SortMergeJoinExample;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -117,13 +118,13 @@ public class TestTezJobs {
   }
 
   @Test(timeout = 60000)
-  public void testIntersectExample() throws Exception {
-    JoinExample intersectExample = new JoinExample();
-    intersectExample.setConf(new Configuration(mrrTezCluster.getConfig()));
+  public void testHashJoinExample() throws Exception {
+    HashJoinExample hashJoinExample = new HashJoinExample();
+    hashJoinExample.setConf(new Configuration(mrrTezCluster.getConfig()));
     Path stagingDirPath = new Path("/tmp/tez-staging-dir");
-    Path inPath1 = new Path("/tmp/inPath1");
-    Path inPath2 = new Path("/tmp/inPath2");
-    Path outPath = new Path("/tmp/outPath");
+    Path inPath1 = new Path("/tmp/hashJoin/inPath1");
+    Path inPath2 = new Path("/tmp/hashJoin/inPath2");
+    Path outPath = new Path("/tmp/hashJoin/outPath");
     remoteFs.mkdirs(inPath1);
     remoteFs.mkdirs(inPath2);
     remoteFs.mkdirs(stagingDirPath);
@@ -152,7 +153,7 @@ public class TestTezJobs {
     String[] args = new String[] {
         "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + stagingDirPath.toString(),
         inPath1.toString(), inPath2.toString(), "1", outPath.toString() };
-    assertEquals(0, intersectExample.run(args));
+    assertEquals(0, hashJoinExample.run(args));
 
     FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() {
       public boolean accept(Path p) {
@@ -172,10 +173,119 @@ public class TestTezJobs {
     assertEquals(0, expectedResult.size());
   }
 
+  @Test(timeout = 60000)
+  public void testSortMergeJoinExample() throws Exception {
+    SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample();
+    sortMergeJoinExample.setConf(new Configuration(mrrTezCluster.getConfig()));
+    Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+    Path inPath1 = new Path("/tmp/sortMerge/inPath1");
+    Path inPath2 = new Path("/tmp/sortMerge/inPath2");
+    Path outPath = new Path("/tmp/sortMerge/outPath");
+    remoteFs.mkdirs(inPath1);
+    remoteFs.mkdirs(inPath2);
+    remoteFs.mkdirs(stagingDirPath);
+
+    Set<String> expectedResult = new HashSet<String>();
+
+    FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file"));
+    FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file"));
+    BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter(out1));
+    BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter(out2));
+    for (int i = 0; i < 20; i++) {
+      String term = "term" + i;
+      writer1.write(term);
+      writer1.newLine();
+      if (i % 2 == 0) {
+        writer2.write(term);
+        writer2.newLine();
+        expectedResult.add(term);
+      }
+    }
+    writer1.close();
+    writer2.close();
+    out1.close();
+    out2.close();
+
+    String[] args = new String[] {
+        "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + stagingDirPath.toString(),
+        inPath1.toString(), inPath2.toString(), "1", outPath.toString() };
+    assertEquals(0, sortMergeJoinExample.run(args));
+
+    FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() {
+      public boolean accept(Path p) {
+        String name = p.getName();
+        return !name.startsWith("_") && !name.startsWith(".");
+      }
+    });
+    assertEquals(1, statuses.length);
+    FSDataInputStream inStream = remoteFs.open(statuses[0].getPath());
+    BufferedReader reader = new BufferedReader(new InputStreamReader(inStream));
+    String line;
+    while ((line = reader.readLine()) != null) {
+      assertTrue(expectedResult.remove(line));
+    }
+    reader.close();
+    inStream.close();
+    assertEquals(0, expectedResult.size());
+  }
+
+  /**
+   * test whole {@link HashJoinExample} pipeline as following: <br>
+   * {@link JoinDataGen} -> {@link HashJoinExample} -> {@link JoinValidate}
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  public void testHashJoinExamplePipeline() throws Exception {
+
+    Path testDir = new Path("/tmp/testHashJoinExample");
+    Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+    remoteFs.mkdirs(stagingDirPath);
+    remoteFs.mkdirs(testDir);
+
+    Path dataPath1 = new Path(testDir, "inPath1");
+    Path dataPath2 = new Path(testDir, "inPath2");
+    Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
+    Path outPath = new Path(testDir, "outPath");
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+    TezClient tezSession = null;
+    try {
+      tezSession = TezClient.create("HashJoinExampleSession", tezConf, true);
+      tezSession.start();
+
+      JoinDataGen dataGen = new JoinDataGen();
+      String[] dataGenArgs = new String[] {
+          dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
+          expectedOutputPath.toString(), "2" };
+      assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezSession));
+
+      HashJoinExample joinExample = new HashJoinExample();
+      String[] args = new String[] {
+          dataPath1.toString(), dataPath2.toString(), "2", outPath.toString() };
+      assertEquals(0, joinExample.run(tezConf, args, tezSession));
+
+      JoinValidate joinValidate = new JoinValidate();
+      String[] validateArgs = new String[] {
+          expectedOutputPath.toString(), outPath.toString(), "3" };
+      assertEquals(0, joinValidate.run(tezConf, validateArgs, tezSession));
+
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+  }
+
+  /**
+   * test whole {@link SortMergeJoinExample} pipeline as following: <br>
+   * {@link JoinDataGen} -> {@link SortMergeJoinExample} -> {@link JoinValidate}
+   * @throws Exception
+   */
   @Test(timeout = 120000)
-  public void testIntersect2() throws Exception {
+  public void testSortMergeJoinExamplePipeline() throws Exception {
 
-    Path testDir = new Path("/tmp/testIntersect2");
+    Path testDir = new Path("/tmp/testSortMergeExample");
     Path stagingDirPath = new Path("/tmp/tez-staging-dir");
     remoteFs.mkdirs(stagingDirPath);
     remoteFs.mkdirs(testDir);
@@ -189,7 +299,7 @@ public class TestTezJobs {
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
     TezClient tezSession = null;
     try {
-      tezSession = TezClient.create("IntersectExampleSession", tezConf);
+      tezSession = TezClient.create("SortMergeExampleSession", tezConf, true);
       tezSession.start();
 
       JoinDataGen dataGen = new JoinDataGen();
@@ -198,15 +308,15 @@ public class TestTezJobs {
           expectedOutputPath.toString(), "2" };
       assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezSession));
 
-      JoinExample intersect = new JoinExample();
-      String[] intersectArgs = new String[] {
+      SortMergeJoinExample joinExample = new SortMergeJoinExample();
+      String[] args = new String[] {
           dataPath1.toString(), dataPath2.toString(), "2", outPath.toString() };
-      assertEquals(0, intersect.run(tezConf, intersectArgs, tezSession));
+      assertEquals(0, joinExample.run(tezConf, args, tezSession));
 
-      JoinValidate intersectValidate = new JoinValidate();
-      String[] intersectValidateArgs = new String[] {
+      JoinValidate joinValidate = new JoinValidate();
+      String[] validateArgs = new String[] {
           expectedOutputPath.toString(), outPath.toString(), "3" };
-      assertEquals(0, intersectValidate.run(tezConf, intersectValidateArgs, tezSession));
+      assertEquals(0, joinValidate.run(tezConf, validateArgs, tezSession));
 
     } finally {
       if (tezSession != null) {