You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2014/02/18 23:35:49 UTC
git commit: CRUNCH-216 Transpose args in MapsideJoinStrategy
Repository: crunch
Updated Branches:
refs/heads/master 296abb86c -> 799b1232f
CRUNCH-216 Transpose args in MapsideJoinStrategy
Initial move in transposing the parameters for the
MapsideJoinStrategy to bring it in line with other join
strategies (i.e. the left-side table should be the smaller of
the two tables being joined).
Introduce static factory methods for creating MapsideJoinStrategy
instances that load the left-side table into memory, and deprecate
the existing public constructor to warn users of the future change.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/799b1232
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/799b1232
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/799b1232
Branch: refs/heads/master
Commit: 799b1232f1aa6179759506722709f26a858adedf
Parents: 296abb8
Author: Gabriel Reid <gr...@apache.org>
Authored: Sat Feb 15 23:24:48 2014 +0100
Committer: Gabriel Reid <gr...@apache.org>
Committed: Tue Feb 18 23:30:17 2014 +0100
----------------------------------------------------------------------
.../crunch/lib/join/MapsideJoinStrategyIT.java | 186 ++++++++++++++++---
.../crunch/lib/join/MapsideJoinStrategy.java | 92 ++++++++-
2 files changed, 247 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/799b1232/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
index 9972549..9add60a 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
@@ -42,6 +42,7 @@ import org.junit.Rule;
import org.junit.Test;
import com.google.common.collect.Lists;
+import sun.print.resources.serviceui;
public class MapsideJoinStrategyIT {
@@ -91,22 +92,44 @@ public class MapsideJoinStrategyIT {
@Test
public void testMapSideJoin_MemPipeline() {
- runMapsideJoin(MemPipeline.getInstance(), true, false);
+ runMapsideJoin(MemPipeline.getInstance(), true, false, MapsideJoinStrategy.<Integer,String,String>create(false));
+ }
+
+ @Test
+ public void testLegacyMapSideJoin_MemPipeline() {
+ runLegacyMapsideJoin(MemPipeline.getInstance(), true, false, new MapsideJoinStrategy<Integer, String, String>(false));
}
@Test
public void testMapSideJoin_MemPipeline_Materialized() {
- runMapsideJoin(MemPipeline.getInstance(), true, true);
+ runMapsideJoin(MemPipeline.getInstance(), true, true, MapsideJoinStrategy.<Integer,String,String>create(true));
+ }
+
+ @Test
+ public void testLegacyMapSideJoin_MemPipeline_Materialized() {
+ runLegacyMapsideJoin(MemPipeline.getInstance(), true, true, new MapsideJoinStrategy<Integer, String, String>(true));
}
@Test
- public void testMapSideJoinLeftOuterJoin_MemPipeline() {
- runMapsideLeftOuterJoin(MemPipeline.getInstance(), true, false);
+ public void testMapSideJoinRightOuterJoin_MemPipeline() {
+ runMapsideRightOuterJoin(MemPipeline.getInstance(), true, false,
+ MapsideJoinStrategy.<Integer, String, String>create(false));
+ }
+
+ @Test
+ public void testLegacyMapSideJoinLeftOuterJoin_MemPipeline() {
+ runLegacyMapsideLeftOuterJoin(MemPipeline.getInstance(), true, false, new MapsideJoinStrategy<Integer, String, String>(false));
+ }
+
+ @Test
+ public void testMapSideJoinRightOuterJoin_MemPipeline_Materialized() {
+ runMapsideRightOuterJoin(MemPipeline.getInstance(), true, true,
+ MapsideJoinStrategy.<Integer, String, String>create(true));
}
@Test
- public void testMapSideJoinLeftOuterJoin_MemPipeline_Materialized() {
- runMapsideLeftOuterJoin(MemPipeline.getInstance(), true, true);
+ public void testLegacyMapSideJoinLeftOuterJoin_MemPipeline_Materialized() {
+ runLegacyMapsideLeftOuterJoin(MemPipeline.getInstance(), true, true, new MapsideJoinStrategy<Integer, String, String>(true));
}
@Test
@@ -128,35 +151,115 @@ public class MapsideJoinStrategyIT {
}
@Test
+ public void testLegacyMapsideJoin_LeftSideIsEmpty() throws IOException {
+ MRPipeline pipeline = new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration());
+ PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+ PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+ PTable<Integer, String> filteredCustomerTable = customerTable
+ .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), customerTable.getPTableType());
+
+
+ JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>();
+ PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(customerTable, filteredCustomerTable,
+ JoinType.INNER_JOIN);
+
+ List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
+
+ assertTrue(materializedJoin.isEmpty());
+ }
+
+ @Test
public void testMapsideJoin() throws IOException {
- runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, false);
+ runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+ false, false, MapsideJoinStrategy.<Integer, String, String>create(false));
+ }
+
+ @Test
+ public void testLegacyMapsideJoin() throws IOException {
+ runLegacyMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+ false, false, new MapsideJoinStrategy<Integer, String, String>(false));
}
@Test
public void testMapsideJoin_Materialized() throws IOException {
- runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, true);
+ runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+ false, true, MapsideJoinStrategy.<Integer, String, String>create(true));
+ }
+
+ @Test
+ public void testLegacyMapsideJoin_Materialized() throws IOException {
+ runLegacyMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+ false, true, new MapsideJoinStrategy<Integer, String, String>(true));
+ }
+
+ @Test
+ public void testMapsideJoin_RightOuterJoin() throws IOException {
+ runMapsideRightOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+ false, false, MapsideJoinStrategy.<Integer, String, String>create(false));
+ }
+
+ @Test
+ public void testLegacyMapsideJoin_LeftOuterJoin() throws IOException {
+ runLegacyMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+ false, false,
+ new MapsideJoinStrategy<Integer, String, String>(false));
}
@Test
- public void testMapsideJoin_LeftOuterJoin() throws IOException {
- runMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, false);
+ public void testMapsideJoin_RightOuterJoin_Materialized() throws IOException {
+ runMapsideRightOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+ false, true, MapsideJoinStrategy.<Integer, String, String>create(true));
}
@Test
- public void testMapsideJoin_LeftOuterJoin_Materialized() throws IOException {
- runMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, true);
+ public void testLegacyMapsideJoin_LeftOuterJoin_Materialized() throws IOException {
+ runLegacyMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+ false, true,
+ new MapsideJoinStrategy<Integer, String, String>(true));
}
- private void runMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize) {
+ private void runMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize,
+ MapsideJoinStrategy<Integer,String, String> joinStrategy) {
PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
- JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(materialize);
- PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.INNER_JOIN)
+ PTable<Integer, String> custOrders = joinStrategy.join(orderTable, customerTable, JoinType.INNER_JOIN)
+ .mapValues("concat", new ConcatValuesFn(), Writables.strings());
+
+ PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
+ PTable<Integer, Pair<String, String>> joined = joinStrategy.join(ORDER_TABLE, custOrders, JoinType.INNER_JOIN);
+
+ List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+ expectedJoinResult.add(Pair.of(111, Pair.of("CORN FLAKES", "[Corn flakes,John Doe]")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet paper,Jane Doe]")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet plunger,Jane Doe]")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet paper,Jane Doe]")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet plunger,Jane Doe]")));
+ expectedJoinResult.add(Pair.of(333, Pair.of("TOILET BRUSH", "[Toilet brush,Someone Else]")));
+ Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+
+ PipelineResult res = pipeline.run();
+ if (!inMemory) {
+ assertEquals(materialize ? 2 : 1, res.getStageResults().size());
+ }
+
+ List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
+ Collections.sort(joinedResultList);
+
+ assertEquals(expectedJoinResult, joinedResultList);
+ }
+
+ private void runLegacyMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize,
+ MapsideJoinStrategy<Integer, String, String> mapsideJoinStrategy) {
+ PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+ PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+ PTable<Integer, String> custOrders = mapsideJoinStrategy.join(customerTable, orderTable, JoinType.INNER_JOIN)
.mapValues("concat", new ConcatValuesFn(), Writables.strings());
PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
- PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.INNER_JOIN);
+ PTable<Integer, Pair<String, String>> joined = mapsideJoinStrategy.join(custOrders, ORDER_TABLE, JoinType.INNER_JOIN);
List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
@@ -166,28 +269,63 @@ public class MapsideJoinStrategyIT {
expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
-
+
PipelineResult res = pipeline.run();
if (!inMemory) {
assertEquals(materialize ? 2 : 1, res.getStageResults().size());
}
-
+
List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
Collections.sort(joinedResultList);
assertEquals(expectedJoinResult, joinedResultList);
}
- private void runMapsideLeftOuterJoin(Pipeline pipeline, boolean inMemory, boolean materialize) {
+ private void runMapsideRightOuterJoin(Pipeline pipeline, boolean inMemory, boolean materialize,
+ MapsideJoinStrategy<Integer, String, String> mapsideJoinStrategy) {
PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
- JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(materialize);
- PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.LEFT_OUTER_JOIN)
+ PTable<Integer, String> custOrders = mapsideJoinStrategy.join(orderTable, customerTable, JoinType.RIGHT_OUTER_JOIN)
+ .mapValues("concat", new ConcatValuesFn(), Writables.strings());
+
+ PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
+ PTable<Integer, Pair<String, String>> joined = mapsideJoinStrategy.join(ORDER_TABLE, custOrders,
+ JoinType.RIGHT_OUTER_JOIN);
+
+ List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+ expectedJoinResult.add(Pair.of(111, Pair.of("CORN FLAKES", "[Corn flakes,John Doe]")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet paper,Jane Doe]")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet plunger,Jane Doe]")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet paper,Jane Doe]")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet plunger,Jane Doe]")));
+ expectedJoinResult.add(Pair.of(333, Pair.of("TOILET BRUSH", "[Toilet brush,Someone Else]")));
+ expectedJoinResult.add(Pair.of(444, Pair.<String,String>of(null, "[null,Has No Orders]")));
+ Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+
+ PipelineResult res = pipeline.run();
+ if (!inMemory) {
+ assertEquals(materialize ? 2 : 1, res.getStageResults().size());
+ }
+
+ List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
+ Collections.sort(joinedResultList);
+
+ assertEquals(expectedJoinResult, joinedResultList);
+ }
+
+ private void runLegacyMapsideLeftOuterJoin(Pipeline pipeline, boolean inMemory, boolean materialize,
+ MapsideJoinStrategy<Integer, String, String> legacyMapsideJoinStrategy) {
+ PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+ PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+ PTable<Integer, String> custOrders = legacyMapsideJoinStrategy.join(customerTable, orderTable,
+ JoinType.LEFT_OUTER_JOIN)
.mapValues("concat", new ConcatValuesFn(), Writables.strings());
PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
- PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.LEFT_OUTER_JOIN);
+ PTable<Integer, Pair<String, String>> joined =
+ legacyMapsideJoinStrategy.join(custOrders, ORDER_TABLE, JoinType.LEFT_OUTER_JOIN);
List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
@@ -198,12 +336,12 @@ public class MapsideJoinStrategyIT {
expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
expectedJoinResult.add(Pair.of(444, Pair.<String,String>of("[Has No Orders,null]", null)));
Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
-
+
PipelineResult res = pipeline.run();
if (!inMemory) {
assertEquals(materialize ? 2 : 1, res.getStageResults().size());
}
-
+
List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
Collections.sort(joinedResultList);
http://git-wip-us.apache.org/repos/asf/crunch/blob/799b1232/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
index 680bb2e..cafb4f9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
@@ -17,12 +17,12 @@
*/
package org.apache.crunch.lib.join;
-import java.io.IOException;
-import java.util.Collection;
-
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
@@ -30,17 +30,22 @@ import org.apache.crunch.ReadableData;
import org.apache.crunch.types.PTypeFamily;
import org.apache.hadoop.conf.Configuration;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collection;
/**
* Utility for doing map side joins on a common key between two {@link PTable}s.
* <p>
* A map side join is an optimized join which doesn't use a reducer; instead,
- * the right side of the join is loaded into memory and the join is performed in
+ * one side of the join is loaded into memory and the join is performed in
* a mapper. This style of join has the important implication that the output of
* the join is not sorted, which is the case with a conventional (reducer-based)
* join.
+ * <p/>
+ * Instances of this class should be instantiated via the {@link #create()} or {@link #create(boolean)} factory
+ * methods, or optionally via the deprecated public constructor for backwards compatibility with
+ * older versions of Crunch where the right-side table was loaded into memory. The public constructor will be removed
+ * in a future release.
*/
public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
@@ -49,24 +54,54 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
/**
* Constructs a new instance of the {@code MapsideJoinStratey}, materializing the right-side
* join table to disk before the join is performed.
+ *
+ * @deprecated Use the {@link #create()} factory method instead
*/
+ @Deprecated
public MapsideJoinStrategy() {
this(true);
}
/**
- * Constructs a new instance of the {@code MapsideJoinStrategy}. If the {@code }materialize}
+ * Constructs a new instance of the {@code MapsideJoinStrategy}. If the {@code materialize}
* argument is true, then the right-side join {@code PTable} will be materialized to disk
* before the in-memory join is performed. If it is false, then Crunch can optionally read
* and process the data from the right-side table without having to run a job to materialize
* the data to disk first.
*
* @param materialize Whether or not to materialize the right-side table before the join
+ *
+ * @deprecated Use the {@link #create(boolean)} factory method instead
*/
+ @Deprecated
public MapsideJoinStrategy(boolean materialize) {
this.materialize = materialize;
}
+ /**
+ * Create a new {@code MapsideJoinStrategy} instance that will load its left-side table into memory,
+ * and will materialize the contents of the left-side table to disk before running the in-memory join.
+ * <p/>
+ * The smaller of the two tables to be joined should be provided as the left-side table of the created join
+ * strategy instance.
+ */
+ public static <K, U, V> MapsideJoinStrategy<K, U, V> create() {
+ return create(true);
+ }
+
+ /**
+ * Create a new {@code MapsideJoinStrategy} instance that will load its left-side table into memory.
+ * <p/>
+ * If the {@code materialize} parameter is true, then the left-side {@code PTable} will be materialized to disk
+ * before the in-memory join is performed. If it is false, then Crunch can optionally read and process the data
+ * from the left-side table without having to run a job to materialize the data to disk first.
+ *
+ * @param materialize Whether or not to materialize the left-side table before the join
+ */
+ public static <K, U, V> MapsideJoinStrategy<K, U, V> create(boolean materialize) {
+ return new LoadLeftSideMapsideJoinStrategy(materialize);
+ }
+
@Override
public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {
switch (joinType) {
@@ -138,4 +173,47 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
}
}
}
+
+ /**
+ * Loads the left-most table (instead of the right-most) in memory while performing the join.
+ */
+ private static class LoadLeftSideMapsideJoinStrategy<K, U, V> extends MapsideJoinStrategy<K, U, V> {
+
+ private MapsideJoinStrategy<K, V, U> mapsideJoinStrategy;
+
+ public LoadLeftSideMapsideJoinStrategy(boolean materialize) {
+ mapsideJoinStrategy = new MapsideJoinStrategy<K, V, U>(materialize);
+ }
+
+ @Override
+ public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {
+
+ JoinType reversedJoinType;
+ switch (joinType) {
+ case INNER_JOIN:
+ reversedJoinType = JoinType.INNER_JOIN;
+ break;
+ case RIGHT_OUTER_JOIN:
+ reversedJoinType = JoinType.LEFT_OUTER_JOIN;
+ break;
+ default:
+ throw new UnsupportedOperationException("Join type " + joinType + " is not supported");
+ }
+
+
+ return mapsideJoinStrategy.join(right, left, reversedJoinType)
+ .mapValues("Reverse order out output table values",
+ new ReversePairOrderFn<V, U>(),
+ left.getTypeFamily().pairs(left.getValueType(), right.getValueType()));
+ }
+ }
+
+ private static class ReversePairOrderFn<V, U> extends MapFn<Pair<V, U>, Pair<U, V>> {
+
+ @Override
+ public Pair<U, V> map(Pair<V, U> input) {
+ return Pair.of(input.second(), input.first());
+ }
+
+ }
}