You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2014/04/02 14:17:14 UTC
git commit: CRUNCH-373 Detach values in MapsideJoin
Repository: crunch
Updated Branches:
refs/heads/master b877dbdfd -> 3f86cf9e6
CRUNCH-373 Detach values in MapsideJoin
Use detached values in the materialized map of values in the
MapsideJoinStrategy so that non-primitive values are correctly
handled as separate instances.
Add test derived from provided test case from Rachit Soni.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3f86cf9e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3f86cf9e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3f86cf9e
Branch: refs/heads/master
Commit: 3f86cf9e655bdae5af2617f46f5f256d394abcf0
Parents: b877dbd
Author: Gabriel Reid <gr...@apache.org>
Authored: Tue Apr 1 20:23:21 2014 -0500
Committer: Gabriel Reid <gr...@apache.org>
Committed: Wed Apr 2 13:47:09 2014 +0200
----------------------------------------------------------------------
.../crunch/lib/join/MapsideJoinStrategyIT.java | 65 +++++++++++++++++++-
.../crunch/lib/join/MapsideJoinStrategy.java | 12 +++-
2 files changed, 71 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/3f86cf9e/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
index 9add60a..f9caa3a 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
@@ -21,11 +21,16 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
+import com.google.common.collect.Lists;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
@@ -36,14 +41,14 @@ import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
-import com.google.common.collect.Lists;
-import sun.print.resources.serviceui;
-
public class MapsideJoinStrategyIT {
private static String saveTempDir;
@@ -219,6 +224,60 @@ public class MapsideJoinStrategyIT {
new MapsideJoinStrategy<Integer, String, String>(true));
}
+ @Test
+ public void testMapSideJoinWithImmutableBytesWritable() throws IOException, InterruptedException {
+ //Write out input files
+ FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());
+ Path path1 = tmpDir.getPath("input1.txt");
+ Path path2 = tmpDir.getPath("input2.txt");
+
+ OutputStream out1 = fs.create(path1, true);
+ OutputStream out2 = fs.create(path2, true);
+
+ for(int i = 0; i < 4; i++){
+ byte[] value = ("value" + i + "\n").getBytes();
+ out1.write(value);
+ out2.write(value);
+ }
+
+ out1.flush();
+ out1.close();
+ out2.flush();
+ out2.close();
+
+ final MRPipeline pipeline = new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration());
+
+ final PCollection<String> values1 = pipeline.readTextFile(path1.toString());
+ final PCollection<String> values2 = pipeline.readTextFile(path2.toString());
+
+ final PTable<Text, Text> convertedValues1 = convertStringToText(values1);
+ final PTable<Text, Text> convertedValues2 = convertStringToText(values2);
+
+ // for map side join
+ final MapsideJoinStrategy<Text, Text, Text> mapSideJoinStrategy = MapsideJoinStrategy.<Text, Text, Text>create();
+
+ final PTable<Text, Pair<Text, Text>> updatedJoinedRows = mapSideJoinStrategy.join(convertedValues1, convertedValues2, JoinType.INNER_JOIN);
+ pipeline.run();
+
+ // Join should have 2 results
+ // Join should have contentBytes1 and contentBytes2
+ assertEquals(4, updatedJoinedRows.materializeToMap().size());
+ }
+
+ /**
+ * The method is used to convert string to entity key
+ */
+ public static PTable<Text, Text> convertStringToText(final PCollection<String> entityKeysStringPCollection) {
+ return entityKeysStringPCollection.parallelDo(new DoFn<String, Pair<Text, Text>>() {
+
+ @Override
+ public void process(final String input, final Emitter<Pair<Text, Text>> emitter) {
+ emitter.emit(new Pair<Text, Text>(new Text(input), new Text(input)));
+ }
+ }, Writables.tableOf(Writables.writables(Text.class), Writables.writables(Text.class)));
+ }
+
+
private void runMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize,
MapsideJoinStrategy<Integer,String, String> joinStrategy) {
PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
http://git-wip-us.apache.org/repos/asf/crunch/blob/3f86cf9e/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
index cafb4f9..d161349 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
@@ -27,6 +27,7 @@ import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.ReadableData;
+import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.hadoop.conf.Configuration;
@@ -119,7 +120,8 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
private PTable<K, Pair<U,V>> joinInternal(PTable<K, U> left, PTable<K, V> right, boolean includeUnmatchedLeftValues) {
PTypeFamily tf = left.getTypeFamily();
ReadableData<Pair<K, V>> rightReadable = right.asReadable(materialize);
- MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(rightReadable, includeUnmatchedLeftValues);
+ MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(
+ rightReadable, right.getPTableType(), includeUnmatchedLeftValues);
ParallelDoOptions options = ParallelDoOptions.builder()
.sourceTargets(rightReadable.getSourceTargets())
.build();
@@ -131,11 +133,13 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
private final ReadableData<Pair<K, V>> readable;
+ private final PTableType<K, V> tableType;
private final boolean includeUnmatched;
private Multimap<K, V> joinMap;
- public MapsideJoinDoFn(ReadableData<Pair<K, V>> rs, boolean includeUnmatched) {
+ public MapsideJoinDoFn(ReadableData<Pair<K, V>> rs, PTableType<K, V> tableType, boolean includeUnmatched) {
this.readable = rs;
+ this.tableType = tableType;
this.includeUnmatched = includeUnmatched;
}
@@ -147,11 +151,13 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
@Override
public void initialize() {
super.initialize();
+ tableType.initialize(getConfiguration());
joinMap = ArrayListMultimap.create();
try {
for (Pair<K, V> joinPair : readable.read(getContext())) {
- joinMap.put(joinPair.first(), joinPair.second());
+ Pair<K, V> detachedPair = tableType.getDetachedValue(joinPair);
+ joinMap.put(detachedPair.first(), detachedPair.second());
}
} catch (IOException e) {
throw new CrunchRuntimeException("Error reading map-side join data", e);