You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2014/04/02 14:17:14 UTC

git commit: CRUNCH-373 Detach values in MapsideJoin

Repository: crunch
Updated Branches:
  refs/heads/master b877dbdfd -> 3f86cf9e6


CRUNCH-373 Detach values in MapsideJoin

Use detached values in the materialized map of values in the
MapsideJoinStrategy so that non-primitive values are correctly
handled as separate instances.

Add test derived from provided test case from Rachit Soni.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3f86cf9e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3f86cf9e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3f86cf9e

Branch: refs/heads/master
Commit: 3f86cf9e655bdae5af2617f46f5f256d394abcf0
Parents: b877dbd
Author: Gabriel Reid <gr...@apache.org>
Authored: Tue Apr 1 20:23:21 2014 -0500
Committer: Gabriel Reid <gr...@apache.org>
Committed: Wed Apr 2 13:47:09 2014 +0200

----------------------------------------------------------------------
 .../crunch/lib/join/MapsideJoinStrategyIT.java  | 65 +++++++++++++++++++-
 .../crunch/lib/join/MapsideJoinStrategy.java    | 12 +++-
 2 files changed, 71 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/3f86cf9e/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
index 9add60a..f9caa3a 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
@@ -21,11 +21,16 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 
+import com.google.common.collect.Lists;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
 import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
@@ -36,14 +41,14 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-import sun.print.resources.serviceui;
-
 public class MapsideJoinStrategyIT {
   
   private static String saveTempDir;
@@ -219,6 +224,60 @@ public class MapsideJoinStrategyIT {
                                   new MapsideJoinStrategy<Integer, String, String>(true));
   }
 
+  @Test
+  public void testMapSideJoinWithImmutableBytesWritable() throws IOException, InterruptedException {
+    //Write out input files
+    FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());
+    Path path1 = tmpDir.getPath("input1.txt");
+    Path path2 = tmpDir.getPath("input2.txt");
+
+    OutputStream out1 = fs.create(path1, true);
+    OutputStream out2 = fs.create(path2, true);
+
+    for(int i = 0; i < 4; i++){
+      byte[] value = ("value" + i + "\n").getBytes();
+      out1.write(value);
+      out2.write(value);
+    }
+
+    out1.flush();
+    out1.close();
+    out2.flush();
+    out2.close();
+
+    final MRPipeline pipeline = new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration());
+
+    final PCollection<String> values1 = pipeline.readTextFile(path1.toString());
+    final PCollection<String> values2 = pipeline.readTextFile(path2.toString());
+
+    final PTable<Text, Text> convertedValues1 = convertStringToText(values1);
+    final PTable<Text, Text> convertedValues2 = convertStringToText(values2);
+
+    // for map side join
+    final MapsideJoinStrategy<Text, Text, Text> mapSideJoinStrategy = MapsideJoinStrategy.<Text, Text, Text>create();
+
+    final PTable<Text, Pair<Text, Text>> updatedJoinedRows = mapSideJoinStrategy.join(convertedValues1, convertedValues2, JoinType.INNER_JOIN);
+    pipeline.run();
+
+    // Join should have 2 results
+    // Join should have contentBytes1 and contentBytes2
+    assertEquals(4, updatedJoinedRows.materializeToMap().size());
+  }
+
+  /**
+   * The method is used to convert string to entity key
+   */
+  public static PTable<Text, Text> convertStringToText(final PCollection<String> entityKeysStringPCollection) {
+    return entityKeysStringPCollection.parallelDo(new DoFn<String, Pair<Text, Text>>() {
+
+      @Override
+      public void process(final String input, final Emitter<Pair<Text, Text>> emitter) {
+        emitter.emit(new Pair<Text, Text>(new Text(input), new Text(input)));
+      }
+    }, Writables.tableOf(Writables.writables(Text.class), Writables.writables(Text.class)));
+  }
+
+
   private void runMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize,
                               MapsideJoinStrategy<Integer,String, String> joinStrategy) {
     PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");

http://git-wip-us.apache.org/repos/asf/crunch/blob/3f86cf9e/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
index cafb4f9..d161349 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
@@ -27,6 +27,7 @@ import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.ReadableData;
+import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.hadoop.conf.Configuration;
 
@@ -119,7 +120,8 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
   private PTable<K, Pair<U,V>> joinInternal(PTable<K, U> left, PTable<K, V> right, boolean includeUnmatchedLeftValues) {
     PTypeFamily tf = left.getTypeFamily();
     ReadableData<Pair<K, V>> rightReadable = right.asReadable(materialize);
-    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(rightReadable, includeUnmatchedLeftValues);
+    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(
+              rightReadable, right.getPTableType(), includeUnmatchedLeftValues);
     ParallelDoOptions options = ParallelDoOptions.builder()
         .sourceTargets(rightReadable.getSourceTargets())
         .build();
@@ -131,11 +133,13 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
   static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
 
     private final ReadableData<Pair<K, V>> readable;
+    private final PTableType<K, V> tableType;
     private final boolean includeUnmatched;
     private Multimap<K, V> joinMap;
 
-    public MapsideJoinDoFn(ReadableData<Pair<K, V>> rs, boolean includeUnmatched) {
+    public MapsideJoinDoFn(ReadableData<Pair<K, V>> rs, PTableType<K, V> tableType, boolean includeUnmatched) {
       this.readable = rs;
+      this.tableType = tableType;
       this.includeUnmatched = includeUnmatched;
     }
 
@@ -147,11 +151,13 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
     @Override
     public void initialize() {
       super.initialize();
+      tableType.initialize(getConfiguration());
 
       joinMap = ArrayListMultimap.create();
       try {
         for (Pair<K, V> joinPair : readable.read(getContext())) {
-          joinMap.put(joinPair.first(), joinPair.second());
+          Pair<K, V> detachedPair = tableType.getDetachedValue(joinPair);
+          joinMap.put(detachedPair.first(), detachedPair.second());
         }
       } catch (IOException e) {
         throw new CrunchRuntimeException("Error reading map-side join data", e);