You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by xi...@apache.org on 2018/11/15 22:59:48 UTC

[1/2] asterixdb git commit: [NO ISSUE][RT] Replace HybridHashJoin with OptimizedHybridHashJoin

Repository: asterixdb
Updated Branches:
  refs/heads/master 7d75792f1 -> 0dec33a96


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0dec33a9/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 6154e28..a87dc1e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -19,12 +19,14 @@
 package org.apache.hyracks.tests.integration;
 
 import java.io.File;
+import java.util.Arrays;
 
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -32,8 +34,10 @@ import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -46,9 +50,12 @@ import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
 import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
+import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 import org.apache.hyracks.tests.util.NoopMissingWriterFactory;
 import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
@@ -68,6 +75,40 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
      * NULL, O_COMMENT VARCHAR(79) NOT NULL );
      */
 
+    private static boolean DEBUG = false;
+
+    static RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
+
+    static RecordDescriptor ordersDesc =
+            new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
+
+    static RecordDescriptor custOrderJoinDesc =
+            new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
+
+    static IValueParserFactory[] custValueParserFactories = new IValueParserFactory[custDesc.getFieldCount()];
+    static IValueParserFactory[] orderValueParserFactories = new IValueParserFactory[ordersDesc.getFieldCount()];
+
+    static {
+        Arrays.fill(custValueParserFactories, UTF8StringParserFactory.INSTANCE);
+        Arrays.fill(orderValueParserFactories, UTF8StringParserFactory.INSTANCE);
+    }
+
     @Test
     public void customerOrderCIDJoin() throws Exception {
         JobSpecification spec = new JobSpecification();
@@ -75,50 +116,17 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         FileSplit[] custSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") };
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
         InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
@@ -155,57 +163,28 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         FileSplit[] custSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") };
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 32, 20, 200, 1.2,
-                new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+        OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20,
+                1.2, new int[] { 1 }, new int[] { 0 },
+                new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null, false, null);
+                custOrderJoinDesc,
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), null,
+                false, null);
+
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -235,50 +214,17 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         FileSplit[] custSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") };
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
         IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()];
@@ -320,50 +266,17 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         FileSplit[] custSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") };
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
         IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()];
@@ -371,11 +284,14 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
             nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE;
         }
 
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 32, 20, 200, 1.2,
-                new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+        OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20,
+                1.2, new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null, true, nonMatchWriterFactories);
+                custOrderJoinDesc,
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null,
+                true, nonMatchWriterFactories);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -408,11 +324,6 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID,
@@ -420,41 +331,13 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") };
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
         InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
@@ -498,11 +381,6 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID,
@@ -510,48 +388,24 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") };
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 100, 1.2,
+        OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 5, 20, 1.2,
                 new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null, false, null);
+                custOrderJoinDesc,
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), null,
+                false, null);
+
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -588,11 +442,6 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID,
@@ -600,41 +449,13 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") };
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
         InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
@@ -678,11 +499,6 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID,
@@ -690,41 +506,13 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") };
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
         MaterializingOperatorDescriptor ordMat = new MaterializingOperatorDescriptor(spec, ordersDesc);
@@ -769,4 +557,161 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         spec.addRoot(printer);
         runTest(spec);
     }
+
+    @Test
+    public void customerOrderCIDHybridHashJoin_Case1() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
+                "data" + File.separator + "tpch0.001" + File.separator + "customer4.tbl") };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders4.tbl") };
+
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 243,
+                1.2, new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc,
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+                null);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        String path = getClass().getName() + File.separator + "case1";
+        IOperatorDescriptor printer = getPrinter(spec, path);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+        System.out.println("output to " + path);
+    }
+
+    @Test
+    public void customerOrderCIDHybridHashJoin_Case2() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
+                "data" + File.separator + "tpch0.001" + File.separator + "customer3.tbl") };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders4.tbl") };
+
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 122,
+                1.2, new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc,
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+                null);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        String path = getClass().getName() + File.separator + "case2";
+        IOperatorDescriptor printer = getPrinter(spec, path);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+        System.out.println("output to " + path);
+    }
+
+    @Test
+    public void customerOrderCIDHybridHashJoin_Case3() throws Exception {
+
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
+                "data" + File.separator + "tpch0.001" + File.separator + "customer3.tbl") };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders1.tbl") };
+
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 6, 122,
+                1.2, new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc,
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+                null);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        String path = getClass().getName() + File.separator + "case3";
+        IOperatorDescriptor printer = getPrinter(spec, path);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+        System.out.println("output to " + path);
+    }
+
+    private IOperatorDescriptor getPrinter(JobSpecification spec, String path) {
+        IFileSplitProvider outputSplitProvider =
+                new ConstantFileSplitProvider(new FileSplit[] { new ManagedFileSplit(NC1_ID, path) });
+
+        return DEBUG ? new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|")
+                : new NullSinkOperatorDescriptor(spec);
+    }
 }


[2/2] asterixdb git commit: [NO ISSUE][RT] Replace HybridHashJoin with OptimizedHybridHashJoin

Posted by xi...@apache.org.
[NO ISSUE][RT] Replace HybridHashJoin with OptimizedHybridHashJoin

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
The HybridHashJoinOperator is an old implenetation which haven't been
used in the runtime, and it lacks necessary documentation and memory
management. The OptimizedHybridHashJoinOperatorDescriptor serves the
same purpose. We should use this instead and avoid maintaining the old
one.

Change-Id: I6ed612cc233af1b78d453c7b711077b82e721e82
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3023
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Xikui Wang <xk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0dec33a9
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0dec33a9
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0dec33a9

Branch: refs/heads/master
Commit: 0dec33a9644d4b45ae809c3c43eb304b3ae05850
Parents: 7d75792
Author: Xikui Wang <xk...@gmail.com>
Authored: Thu Nov 15 13:32:01 2018 -0800
Committer: Xikui Wang <xk...@gmail.com>
Committed: Thu Nov 15 14:59:11 2018 -0800

----------------------------------------------------------------------
 .../physical/HybridHashJoinPOperator.java       |  44 +-
 .../join/HybridHashJoinOperatorDescriptor.java  | 577 -------------------
 ...TPCHCustomerOptimizedHybridHashJoinTest.java | 245 --------
 .../TPCHCustomerOrderHashJoinTest.java          | 545 ++++++++----------
 4 files changed, 247 insertions(+), 1164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0dec33a9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 45ec44b..091cc44 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -45,7 +45,6 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
@@ -55,7 +54,6 @@ import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -65,7 +63,6 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
     // The maximum number of in-memory frames that this hash join can use.
     private final int memSizeInFrames;
     private final int maxInputBuildSizeInFrames;
-    private final int aveRecordsPerFrame;
     private final double fudgeFactor;
 
     private static final Logger LOGGER = LogManager.getLogger();
@@ -76,7 +73,6 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
         super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
         this.memSizeInFrames = memSizeInFrames;
         this.maxInputBuildSizeInFrames = maxInputSizeInFrames;
-        this.aveRecordsPerFrame = aveRecordsPerFrame;
         this.fudgeFactor = fudgeFactor;
         if (LOGGER.isTraceEnabled()) {
             LOGGER.trace("HybridHashJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType="
@@ -117,8 +113,6 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
         int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
         int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
-        IBinaryHashFunctionFactory[] hashFunFactories =
-                JobGenHelper.variablesToBinaryHashFunctionFactories(keysLeftBranch, env, context);
         IBinaryHashFunctionFamily[] hashFunFamilies =
                 JobGenHelper.variablesToBinaryHashFunctionFamilies(keysLeftBranch, env, context);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
@@ -138,21 +132,9 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IOperatorDescriptor opDesc;
-        boolean optimizedHashJoin = true;
-        for (IBinaryHashFunctionFamily family : hashFunFamilies) {
-            if (family == null) {
-                optimizedHashJoin = false;
-                break;
-            }
-        }
 
-        if (optimizedHashJoin) {
-            opDesc = generateOptimizedHashJoinRuntime(context, inputSchemas, keysLeft, keysRight, hashFunFamilies,
-                    comparatorFactories, predEvaluatorFactory, recDescriptor, spec);
-        } else {
-            opDesc = generateHashJoinRuntime(context, inputSchemas, keysLeft, keysRight, hashFunFactories,
-                    comparatorFactories, predEvaluatorFactory, recDescriptor, spec);
-        }
+        opDesc = generateOptimizedHashJoinRuntime(context, inputSchemas, keysLeft, keysRight, hashFunFamilies,
+                comparatorFactories, predEvaluatorFactory, recDescriptor, spec);
         opDesc.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
@@ -162,28 +144,6 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
         builder.contributeGraphEdge(src2, 0, op, 1);
     }
 
-    private IOperatorDescriptor generateHashJoinRuntime(JobGenContext context, IOperatorSchema[] inputSchemas,
-            int[] keysLeft, int[] keysRight, IBinaryHashFunctionFactory[] hashFunFactories,
-            IBinaryComparatorFactory[] comparatorFactories, IPredicateEvaluatorFactory predEvaluatorFactory,
-            RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) throws AlgebricksException {
-        switch (kind) {
-            case INNER:
-                return new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), maxInputBuildSizeInFrames,
-                        aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, hashFunFactories,
-                        comparatorFactories, recDescriptor, predEvaluatorFactory, false, null);
-            case LEFT_OUTER:
-                IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()];
-                for (int j = 0; j < nonMatchWriterFactories.length; j++) {
-                    nonMatchWriterFactories[j] = context.getMissingWriterFactory();
-                }
-                return new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), maxInputBuildSizeInFrames,
-                        aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, hashFunFactories,
-                        comparatorFactories, recDescriptor, predEvaluatorFactory, true, nonMatchWriterFactories);
-            default:
-                throw new NotImplementedException();
-        }
-    }
-
     private IOperatorDescriptor generateOptimizedHashJoinRuntime(JobGenContext context, IOperatorSchema[] inputSchemas,
             int[] keysLeft, int[] keysRight, IBinaryHashFunctionFamily[] hashFunFamilies,
             IBinaryComparatorFactory[] comparatorFactories, IPredicateEvaluatorFactory predEvaluatorFactory,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0dec33a9/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
deleted file mode 100644
index 034b054..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ /dev/null
@@ -1,577 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
-import org.apache.hyracks.dataflow.std.structures.SimpleSerializableHashTable;
-import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
-
-public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
-    private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
-
-    private final int memsize;
-    private static final long serialVersionUID = 1L;
-    private final int inputsize0;
-    private final double factor;
-    private final int recordsPerFrame;
-    private final int[] keys0;
-    private final int[] keys1;
-    private final IBinaryHashFunctionFactory[] hashFunctionFactories;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final IPredicateEvaluatorFactory predEvaluatorFactory;
-    private final boolean isLeftOuter;
-    private final IMissingWriterFactory[] nonMatchWriterFactories1;
-
-    /**
-     * @param spec
-     * @param memsize
-     *            in frames
-     * @param inputsize0
-     *            in frames
-     * @param recordsPerFrame
-     * @param factor
-     * @param keys0
-     * @param keys1
-     * @param hashFunctionFactories
-     * @param comparatorFactories
-     * @param recordDescriptor
-     */
-    public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
-            int recordsPerFrame, double factor, int[] keys0, int[] keys1,
-            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter,
-            IMissingWriterFactory[] nullWriterFactories1) {
-        super(spec, 2, 1);
-        this.memsize = memsize;
-        this.inputsize0 = inputsize0;
-        this.factor = factor;
-        this.recordsPerFrame = recordsPerFrame;
-        this.keys0 = keys0;
-        this.keys1 = keys1;
-        this.hashFunctionFactories = hashFunctionFactories;
-        this.comparatorFactories = comparatorFactories;
-        this.predEvaluatorFactory = predEvalFactory;
-        this.isLeftOuter = isLeftOuter;
-        this.nonMatchWriterFactories1 = nullWriterFactories1;
-        outRecDescs[0] = recordDescriptor;
-    }
-
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        ActivityId p1Aid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID);
-        ActivityId p2Aid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID);
-        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(p1Aid, p2Aid);
-        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(p2Aid, p1Aid);
-
-        builder.addActivity(this, phase1);
-        builder.addSourceEdge(1, phase1, 0);
-
-        builder.addActivity(this, phase2);
-        builder.addSourceEdge(0, phase2, 0);
-
-        builder.addBlockingEdge(phase1, phase2);
-
-        builder.addTargetEdge(0, phase2, 0);
-    }
-
-    public static class BuildAndPartitionTaskState extends AbstractStateObject {
-        private RunFileWriter[] fWriters;
-        private InMemoryHashJoin joiner;
-        private int nPartitions;
-        private int memoryForHashtable;
-
-        public BuildAndPartitionTaskState() {
-        }
-
-        private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
-            super(jobId, taskId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
-
-    }
-
-    private class BuildAndPartitionActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        private final ActivityId joinAid;
-
-        public BuildAndPartitionActivityNode(ActivityId id, ActivityId joinAid) {
-            super(id);
-            this.joinAid = joinAid;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                throws HyracksDataException {
-            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0);
-            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
-            final IMissingWriter[] nullWriters1 =
-                    isLeftOuter ? new IMissingWriter[nonMatchWriterFactories1.length] : null;
-            if (isLeftOuter) {
-                for (int i = 0; i < nonMatchWriterFactories1.length; i++) {
-                    nullWriters1[i] = nonMatchWriterFactories1[i].createMissingWriter();
-                }
-            }
-            final IPredicateEvaluator predEvaluator =
-                    (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
-
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
-                        ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
-                private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(rd1);
-                private final ITuplePartitionComputer hpcBuild =
-                        new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx);
-                private final FrameTupleAppender appender = new FrameTupleAppender();
-                private final FrameTupleAppender ftappender = new FrameTupleAppender();
-                private IFrame[] bufferForPartitions;
-                private final IFrame inBuffer = new VSizeFrame(ctx);
-
-                @Override
-                public void close() throws HyracksDataException {
-                    if (state.memoryForHashtable != 0) {
-                        build(inBuffer.getBuffer());
-                    }
-
-                    for (int i = 0; i < state.nPartitions; i++) {
-                        ByteBuffer buf = bufferForPartitions[i].getBuffer();
-                        accessorBuild.reset(buf);
-                        if (accessorBuild.getTupleCount() > 0) {
-                            write(i, buf);
-                        }
-                        closeWriter(i);
-                    }
-
-                    ctx.setStateObject(state);
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-
-                    if (state.memoryForHashtable != memsize - 2) {
-                        accessorBuild.reset(buffer);
-                        int tCount = accessorBuild.getTupleCount();
-                        for (int i = 0; i < tCount; ++i) {
-                            int entry;
-                            if (state.memoryForHashtable == 0) {
-                                entry = hpcBuild.partition(accessorBuild, i, state.nPartitions);
-                                boolean newBuffer = false;
-                                IFrame bufBi = bufferForPartitions[entry];
-                                while (true) {
-                                    appender.reset(bufBi, newBuffer);
-                                    if (appender.append(accessorBuild, i)) {
-                                        break;
-                                    } else {
-                                        write(entry, bufBi.getBuffer());
-                                        bufBi.reset();
-                                        newBuffer = true;
-                                    }
-                                }
-                            } else {
-                                entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
-                                if (entry < state.memoryForHashtable) {
-                                    while (true) {
-                                        if (!ftappender.append(accessorBuild, i)) {
-                                            build(inBuffer.getBuffer());
-
-                                            ftappender.reset(inBuffer, true);
-                                        } else {
-                                            break;
-                                        }
-                                    }
-                                } else {
-                                    entry %= state.nPartitions;
-                                    boolean newBuffer = false;
-                                    IFrame bufBi = bufferForPartitions[entry];
-                                    while (true) {
-                                        appender.reset(bufBi, newBuffer);
-                                        if (appender.append(accessorBuild, i)) {
-                                            break;
-                                        } else {
-                                            write(entry, bufBi.getBuffer());
-                                            bufBi.reset();
-                                            newBuffer = true;
-                                        }
-                                    }
-                                }
-                            }
-
-                        }
-                    } else {
-                        build(buffer);
-                    }
-
-                }
-
-                private void build(ByteBuffer inBuffer) throws HyracksDataException {
-                    ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.capacity());
-                    FrameUtils.copyAndFlip(inBuffer, copyBuffer);
-                    state.joiner.build(copyBuffer);
-                }
-
-                @Override
-                public void open() throws HyracksDataException {
-                    if (memsize > 1) {
-                        if (memsize > inputsize0) {
-                            state.nPartitions = 0;
-                        } else {
-                            state.nPartitions =
-                                    (int) (Math.ceil((inputsize0 * factor / nPartitions - memsize) / (memsize - 1)));
-                        }
-                        if (state.nPartitions <= 0) {
-                            // becomes in-memory HJ
-                            state.memoryForHashtable = memsize - 2;
-                            state.nPartitions = 0;
-                        } else {
-                            state.memoryForHashtable = memsize - state.nPartitions - 2;
-                            if (state.memoryForHashtable < 0) {
-                                state.memoryForHashtable = 0;
-                                state.nPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
-                            }
-                        }
-                    } else {
-                        throw new HyracksDataException("not enough memory");
-                    }
-
-                    ITuplePartitionComputer hpc0 =
-                            new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner(ctx);
-                    ITuplePartitionComputer hpc1 =
-                            new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx);
-                    int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
-                    ISerializableTable table = new SimpleSerializableHashTable(tableSize, ctx);
-                    state.joiner =
-                            new InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0), hpc0, new FrameTupleAccessor(rd1),
-                                    rd1, hpc1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
-                                    nullWriters1, table, predEvaluator, null);
-                    bufferForPartitions = new IFrame[state.nPartitions];
-                    state.fWriters = new RunFileWriter[state.nPartitions];
-                    for (int i = 0; i < state.nPartitions; i++) {
-                        bufferForPartitions[i] = new VSizeFrame(ctx);
-                    }
-
-                    ftappender.reset(inBuffer, true);
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                }
-
-                private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = state.fWriters[i];
-                    if (writer != null) {
-                        writer.close();
-                    }
-                }
-
-                private void write(int i, ByteBuffer head) throws HyracksDataException {
-                    RunFileWriter writer = state.fWriters[i];
-                    if (writer == null) {
-                        FileReference file = ctx.getJobletContext()
-                                .createManagedWorkspaceFile(BuildAndPartitionActivityNode.class.getSimpleName());
-                        writer = new RunFileWriter(file, ctx.getIoManager());
-                        writer.open();
-                        state.fWriters[i] = writer;
-                    }
-                    writer.nextFrame(head);
-                }
-            };
-            return op;
-        }
-    }
-
-    private class PartitionAndJoinActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        private final ActivityId buildAid;
-
-        public PartitionAndJoinActivityNode(ActivityId id, ActivityId buildAid) {
-            super(id);
-            this.buildAid = buildAid;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                throws HyracksDataException {
-            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
-            final IMissingWriter[] nullWriters1 =
-                    isLeftOuter ? new IMissingWriter[nonMatchWriterFactories1.length] : null;
-            if (isLeftOuter) {
-                for (int i = 0; i < nonMatchWriterFactories1.length; i++) {
-                    nullWriters1[i] = nonMatchWriterFactories1[i].createMissingWriter();
-                }
-            }
-            final IPredicateEvaluator predEvaluator =
-                    (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
-
-            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-                private BuildAndPartitionTaskState state;
-                private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(rd0);
-                private final ITuplePartitionComputerFactory hpcf0 =
-                        new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories);
-                private final ITuplePartitionComputerFactory hpcf1 =
-                        new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories);
-                private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner(ctx);
-
-                private final FrameTupleAppender appender = new FrameTupleAppender();
-                private final FrameTupleAppender ftap = new FrameTupleAppender();
-                private final IFrame inBuffer = new VSizeFrame(ctx);
-                private final IFrame outBuffer = new VSizeFrame(ctx);
-                private RunFileWriter[] buildWriters;
-                private RunFileWriter[] probeWriters;
-                private IFrame[] bufferForPartitions;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    writer.open();
-                    state = (BuildAndPartitionTaskState) ctx.getStateObject(
-                            new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
-                    buildWriters = state.fWriters;
-                    probeWriters = new RunFileWriter[state.nPartitions];
-                    bufferForPartitions = new IFrame[state.nPartitions];
-                    for (int i = 0; i < state.nPartitions; i++) {
-                        bufferForPartitions[i] = new VSizeFrame(ctx);
-                    }
-                    appender.reset(outBuffer, true);
-                    ftap.reset(inBuffer, true);
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    if (state.memoryForHashtable != memsize - 2) {
-                        accessorProbe.reset(buffer);
-                        int tupleCount0 = accessorProbe.getTupleCount();
-                        for (int i = 0; i < tupleCount0; ++i) {
-
-                            int entry;
-                            if (state.memoryForHashtable == 0) {
-                                entry = hpcProbe.partition(accessorProbe, i, state.nPartitions);
-                                boolean newBuffer = false;
-                                IFrame outbuf = bufferForPartitions[entry];
-                                while (true) {
-                                    appender.reset(outbuf, newBuffer);
-                                    if (appender.append(accessorProbe, i)) {
-                                        break;
-                                    } else {
-                                        write(entry, outbuf.getBuffer());
-                                        outbuf.reset();
-                                        newBuffer = true;
-                                    }
-                                }
-                            } else {
-                                entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
-                                if (entry < state.memoryForHashtable) {
-                                    while (true) {
-                                        if (!ftap.append(accessorProbe, i)) {
-                                            state.joiner.join(inBuffer.getBuffer(), writer);
-                                            ftap.reset(inBuffer, true);
-                                        } else {
-                                            break;
-                                        }
-                                    }
-
-                                } else {
-                                    entry %= state.nPartitions;
-                                    boolean newBuffer = false;
-                                    IFrame outbuf = bufferForPartitions[entry];
-                                    while (true) {
-                                        appender.reset(outbuf, newBuffer);
-                                        if (appender.append(accessorProbe, i)) {
-                                            break;
-                                        } else {
-                                            write(entry, outbuf.getBuffer());
-                                            outbuf.reset();
-                                            newBuffer = true;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    } else {
-                        state.joiner.join(buffer, writer);
-                    }
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    try {
-                        try {
-                            state.joiner.join(inBuffer.getBuffer(), writer);
-                            state.joiner.completeJoin(writer);
-                        } finally {
-                            state.joiner.releaseMemory();
-                        }
-                        ITuplePartitionComputer hpcRep0 =
-                                new RepartitionComputerFactory(state.nPartitions, hpcf0).createPartitioner(ctx);
-                        ITuplePartitionComputer hpcRep1 =
-                                new RepartitionComputerFactory(state.nPartitions, hpcf1).createPartitioner(ctx);
-                        if (state.memoryForHashtable != memsize - 2) {
-                            for (int i = 0; i < state.nPartitions; i++) {
-                                ByteBuffer buf = bufferForPartitions[i].getBuffer();
-                                accessorProbe.reset(buf);
-                                if (accessorProbe.getTupleCount() > 0) {
-                                    write(i, buf);
-                                }
-                                closeWriter(i);
-                            }
-
-                            inBuffer.reset();
-                            int tableSize = -1;
-                            if (state.memoryForHashtable == 0) {
-                                tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
-                            } else {
-                                tableSize = (int) (memsize * recordsPerFrame * factor);
-                            }
-                            ISerializableTable table = new SimpleSerializableHashTable(tableSize, ctx);
-                            for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
-                                RunFileWriter buildWriter = buildWriters[partitionid];
-                                RunFileWriter probeWriter = probeWriters[partitionid];
-                                if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
-                                    continue;
-                                }
-                                table.reset();
-                                InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0),
-                                        hpcRep0, new FrameTupleAccessor(rd1), rd1, hpcRep1,
-                                        new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
-                                        nullWriters1, table, predEvaluator, null);
-
-                                if (buildWriter != null) {
-                                    RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
-                                    try {
-                                        buildReader.open();
-                                        while (buildReader.nextFrame(inBuffer)) {
-                                            ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
-                                            FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
-                                            joiner.build(copyBuffer);
-                                            inBuffer.reset();
-                                        }
-                                    } finally {
-                                        buildReader.close();
-                                    }
-                                }
-
-                                // probe
-                                RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
-                                try {
-                                    probeReader.open();
-                                    try {
-                                        while (probeReader.nextFrame(inBuffer)) {
-                                            joiner.join(inBuffer.getBuffer(), writer);
-                                            inBuffer.reset();
-                                        }
-                                        joiner.completeJoin(writer);
-                                    } finally {
-                                        joiner.releaseMemory();
-                                    }
-                                } finally {
-                                    probeReader.close();
-                                }
-                            }
-                        }
-                    } finally {
-                        writer.close();
-                    }
-                }
-
-                private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = probeWriters[i];
-                    if (writer != null) {
-                        writer.close();
-                    }
-                }
-
-                private void write(int i, ByteBuffer head) throws HyracksDataException {
-                    RunFileWriter writer = probeWriters[i];
-                    if (writer == null) {
-                        FileReference file =
-                                ctx.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class.getSimpleName());
-                        writer = new RunFileWriter(file, ctx.getIoManager());
-                        writer.open();
-                        probeWriters[i] = writer;
-                    }
-                    writer.nextFrame(head);
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    writer.fail();
-                }
-            };
-            return op;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0dec33a9/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
deleted file mode 100644
index 289f8ae..0000000
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.tests.integration;
-
-import java.io.File;
-import java.util.Arrays;
-
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.io.ManagedFileSplit;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.std.connectors.MToNBroadcastConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
-import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
-import org.junit.Test;
-
-public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegrationTest {
-
-    private static boolean DEBUG = false;
-
-    static RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-    static RecordDescriptor ordersDesc =
-            new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-    static RecordDescriptor custOrderJoinDesc =
-            new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-    static IValueParserFactory[] custValueParserFactories = new IValueParserFactory[custDesc.getFieldCount()];
-    static IValueParserFactory[] orderValueParserFactories = new IValueParserFactory[ordersDesc.getFieldCount()];
-
-    static {
-        Arrays.fill(custValueParserFactories, UTF8StringParserFactory.INSTANCE);
-        Arrays.fill(orderValueParserFactories, UTF8StringParserFactory.INSTANCE);
-    }
-
-    private IOperatorDescriptor getPrinter(JobSpecification spec, String path) {
-        IFileSplitProvider outputSplitProvider =
-                new ConstantFileSplitProvider(new FileSplit[] { new ManagedFileSplit(NC1_ID, path) });
-
-        return DEBUG ? new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|")
-                : new NullSinkOperatorDescriptor(spec);
-    }
-
-    @Test
-    public void customerOrderCIDHybridHashJoin_Case1() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
-                "data" + File.separator + "tpch0.001" + File.separator + "customer4.tbl") };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-
-        FileSplit[] ordersSplits = new FileSplit[] {
-                new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders4.tbl") };
-
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
-
-        OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 243,
-                1.2, new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc,
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
-                null);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
-
-        String path = getClass().getName() + File.separator + "case1";
-        IOperatorDescriptor printer = getPrinter(spec, path);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 0);
-
-        IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-        System.out.println("output to " + path);
-    }
-
-    @Test
-    public void customerOrderCIDHybridHashJoin_Case2() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
-                "data" + File.separator + "tpch0.001" + File.separator + "customer3.tbl") };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-
-        FileSplit[] ordersSplits = new FileSplit[] {
-                new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders4.tbl") };
-
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
-
-        OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 122,
-                1.2, new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc,
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
-                null);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
-
-        String path = getClass().getName() + File.separator + "case2";
-        IOperatorDescriptor printer = getPrinter(spec, path);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 0);
-
-        IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-        System.out.println("output to " + path);
-    }
-
-    @Test
-    public void customerOrderCIDHybridHashJoin_Case3() throws Exception {
-
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
-                "data" + File.separator + "tpch0.001" + File.separator + "customer3.tbl") };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-
-        FileSplit[] ordersSplits = new FileSplit[] {
-                new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders1.tbl") };
-
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
-
-        OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 6, 122,
-                1.2, new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc,
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
-                null);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
-
-        String path = getClass().getName() + File.separator + "case3";
-        IOperatorDescriptor printer = getPrinter(spec, path);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 0);
-
-        IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-        System.out.println("output to " + path);
-    }
-
-}