You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2016/02/26 06:53:59 UTC
[02/11] incubator-asterixdb-hyracks git commit: Implemented the
memory-bounded HashGroupby and HashJoin for BigObject
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
index a10513a..a4c87c8 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.tests.integration;
import java.io.File;
+import java.util.Arrays;
import org.junit.Test;
@@ -43,63 +44,73 @@ import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.FileSplit;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegrationTest {
- private static final boolean DEBUG = false;
+
+ private static boolean DEBUG = false;
+
+ static RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
+
+ static RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer() });
+
+ static RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer() });
+
+ static IValueParserFactory[] custValueParserFactories = new IValueParserFactory[custDesc.getFieldCount()];
+ static IValueParserFactory[] orderValueParserFactories = new IValueParserFactory[ordersDesc.getFieldCount()];
+
+ static {
+ Arrays.fill(custValueParserFactories, UTF8StringParserFactory.INSTANCE);
+ Arrays.fill(orderValueParserFactories, UTF8StringParserFactory.INSTANCE);
+ }
+
+ private IOperatorDescriptor getPrinter(JobSpecification spec, File file) {
+ IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] {
+ new FileSplit(NC1_ID, file.getAbsolutePath()) });
+
+ return DEBUG ? new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|")
+ : new NullSinkOperatorDescriptor(spec);
+ }
@Test
public void customerOrderCIDHybridHashJoin_Case1() throws Exception {
JobSpecification spec = new JobSpecification();
-
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
"data/tpch0.001/customer4.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
"data/tpch0.001/orders4.tbl"))) };
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 243,
@@ -107,13 +118,14 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
- new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+ null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
- : new NullSinkOperatorDescriptor(spec);
+ File file = File.createTempFile(getClass().getName(), "case1");
+ IOperatorDescriptor printer = getPrinter(spec, file);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -127,6 +139,7 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
spec.addRoot(printer);
runTest(spec);
+ System.out.println("output to " + file.getAbsolutePath());
}
@Test
@@ -136,48 +149,18 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
"data/tpch0.001/customer3.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
"data/tpch0.001/orders4.tbl"))) };
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 122,
@@ -185,13 +168,14 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
- new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+ null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
- : new NullSinkOperatorDescriptor(spec);
+ File file = File.createTempFile(getClass().getName(), "case2");
+ IOperatorDescriptor printer = getPrinter(spec, file);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -205,6 +189,7 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
spec.addRoot(printer);
runTest(spec);
+ System.out.println("output to " + file.getAbsolutePath());
}
@Test
@@ -215,48 +200,18 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
"data/tpch0.001/customer3.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
"data/tpch0.001/orders1.tbl"))) };
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 6, 122,
@@ -264,13 +219,14 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
- new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+ null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
- : new NullSinkOperatorDescriptor(spec);
+ File file = File.createTempFile(getClass().getName(), "case3");
+ IOperatorDescriptor printer = getPrinter(spec, file);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -284,6 +240,7 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
spec.addRoot(printer);
runTest(spec);
+ System.out.println("output to " + file.getAbsolutePath());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 2650799..8232a62 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -292,7 +292,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, null);
+ custOrderJoinDesc, null, false, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -815,7 +815,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, null);
+ custOrderJoinDesc, null, false, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java
index b774e0e..039497c 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java
@@ -87,7 +87,6 @@ public class VSizeFrameSortMergeTest extends AbstractIntegrationTest {
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
- // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID );
spec.setFrameSize(frameSize);
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, new int[] { 1, 0 },
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
new file mode 100644
index 0000000..b8ec790
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.AvgFieldGroupAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.AvgFieldMergeAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import org.apache.hyracks.test.support.TestUtils;
+
+public abstract class AbstractExternalGroupbyTest {
+
+ ISerializerDeserializer[] inFields = new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE,
+ new UTF8StringSerializerDeserializer(),
+ };
+
+ ISerializerDeserializer[] aggrFields = new ISerializerDeserializer[] {
+ new UTF8StringSerializerDeserializer(), // key
+ IntegerSerializerDeserializer.INSTANCE, // sum
+ IntegerSerializerDeserializer.INSTANCE, // count
+ FloatSerializerDeserializer.INSTANCE, // avg
+ };
+
+ RecordDescriptor inRecordDesc = new RecordDescriptor(inFields);
+
+ RecordDescriptor outputRec = new RecordDescriptor(aggrFields);
+
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+
+ INormalizedKeyComputerFactory normalizedKeyComputerFactory = new UTF8StringNormalizedKeyComputerFactory();
+
+ IAggregatorDescriptorFactory partialAggrInPlace = new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(0, false),
+ new CountFieldAggregatorFactory(false),
+ new AvgFieldGroupAggregatorFactory(0, false) });
+
+ IAggregatorDescriptorFactory finalAggrInPlace = new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(2, false),
+ new AvgFieldMergeAggregatorFactory(3, false) });
+
+ IAggregatorDescriptorFactory partialAggrInState = new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(0, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(0, true) });
+
+ IAggregatorDescriptorFactory finalAggrInState = new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(2, true),
+ new AvgFieldMergeAggregatorFactory(3, true) });
+
+ int[] keyFields = new int[] { 1 };
+ int[] keyFieldsAfterPartial = new int[] { 0 };
+
+ class ResultValidateWriter implements IFrameWriter {
+
+ final Map<Integer, String> keyValueMap;
+ FrameTupleAccessor resultAccessor = new FrameTupleAccessor(outputRec);
+
+ class Result {
+ Result(int i) {
+ sum = i;
+ count = 1;
+ }
+
+ int sum;
+ int count;
+ }
+
+ private Map<String, Result> answer;
+
+ public ResultValidateWriter(Map<Integer, String> keyValueMap) {
+ this.keyValueMap = keyValueMap;
+ answer = new HashMap<>();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ for (Map.Entry<Integer, String> keyValue : keyValueMap.entrySet()) {
+ Result result = answer.get(keyValue.getValue());
+ if (result == null) {
+ answer.put(keyValue.getValue(), new Result(keyValue.getKey()));
+ } else {
+ result.sum += keyValue.getKey();
+ result.count++;
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ resultAccessor.reset(buffer);
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream di = new DataInputStream(bbis);
+
+ Object[] outRecord = new Object[outputRec.getFieldCount()];
+
+ for (int tid = 0; tid < resultAccessor.getTupleCount(); tid++) {
+ for (int fid = 0; fid < outputRec.getFieldCount(); fid++) {
+ bbis.setByteBuffer(resultAccessor.getBuffer(),
+ resultAccessor.getAbsoluteFieldStartOffset(tid, fid));
+ outRecord[fid] = outputRec.getFields()[fid].deserialize(di);
+ }
+ Result result = answer.remove((String) outRecord[0]);
+ assertNotNull(result);
+ assertEquals(result.sum, (int) outRecord[1]);
+ assertEquals(result.count, (int) outRecord[2]);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ Assert.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ assertEquals(0, answer.size());
+ }
+ }
+
+ @Test
+ public void testBuildAndMergeNormalFrameInMem() throws HyracksDataException {
+ int tableSize = 1001;
+ int numFrames = 3;
+ int frameSize = 256;
+ int minDataSize = frameSize;
+ int minRecordSize = 20;
+ int maxRecordSize = 50;
+ testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, null);
+ }
+
+ @Test
+ public void testBuildAndMergeNormalFrameSpill() throws HyracksDataException {
+ int tableSize = 1001;
+ int numFrames = 3;
+ int frameSize = 256;
+ int minDataSize = frameSize * 4;
+ int minRecordSize = 20;
+ int maxRecordSize = 50;
+ testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, null);
+ }
+
+ @Test
+ public void testBuildAndMergeBigObj() throws HyracksDataException {
+ int tableSize = 1001;
+ int numFrames = 4;
+ int frameSize = 256;
+ int minDataSize = frameSize * 5;
+ int minRecordSize = 20;
+ int maxRecordSize = 50;
+ HashMap<Integer, String> bigRecords = AbstractRunGeneratorTest.generateBigObject(frameSize, 2);
+ testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize,
+ bigRecords);
+
+ }
+
+ protected abstract void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) throws HyracksDataException;
+
+ protected abstract IFrameWriter getBuilder();
+
+ protected abstract IOperatorNodePushable getMerger();
+
+ private void testBuildAndMerge(int tableSize, int numFrames, int frameSize, int minDataSize,
+ int minRecordSize, int maxRecordSize,
+ Map<Integer, String> specialData)
+ throws HyracksDataException {
+
+ IHyracksTaskContext ctx = TestUtils.create(frameSize);
+ initial(ctx, tableSize, numFrames);
+ ArrayList<IFrame> input = new ArrayList<>();
+ Map<Integer, String> keyValueMap = new HashMap<>();
+ AbstractRunGeneratorTest
+ .prepareData(ctx, input, minDataSize, minRecordSize, maxRecordSize, specialData, keyValueMap);
+
+ ResultValidateWriter writer = new ResultValidateWriter(keyValueMap);
+
+ getBuilder().open();
+ for (IFrame frame : input) {
+ getBuilder().nextFrame(frame.getBuffer());
+ }
+ getBuilder().close();
+
+ getMerger().setOutputFrameWriter(0, writer, outputRec);
+ getMerger().initialize();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
index 3cc2a23..673c6fa 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -45,8 +45,8 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
import org.apache.hyracks.dataflow.std.sort.util.GroupFrameAccessor;
import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
import org.apache.hyracks.test.support.TestUtils;
@@ -54,8 +54,8 @@ import org.junit.Test;
public abstract class AbstractRunGeneratorTest {
static TestUtils testUtils = new TestUtils();
- static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+ static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+ new UTF8StringSerializerDeserializer() };
static RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
static Random GRandom = new Random(System.currentTimeMillis());
static int[] SortFields = new int[] { 0, 1 };
@@ -63,17 +63,17 @@ public abstract class AbstractRunGeneratorTest {
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
- static void assertMaxFrameSizesAreAllEqualsTo(List<RunAndMaxFrameSizePair> maxSize, int pageSize) {
+ static void assertMaxFrameSizesAreAllEqualsTo(List<GeneratedRunFileReader> maxSize, int pageSize) {
for (int i = 0; i < maxSize.size(); i++) {
- assertTrue(maxSize.get(i).maxFrameSize == pageSize);
+ assertTrue(maxSize.get(i).getMaxFrameSize() == pageSize);
}
}
abstract AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
throws HyracksDataException;
- protected List<RunAndMaxFrameSizePair> testSortRecords(int pageSize, int frameLimit, int numRuns,
- int minRecordSize, int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
+ protected List<GeneratedRunFileReader> testSortRecords(int pageSize, int frameLimit, int numRuns, int minRecordSize,
+ int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
IHyracksTaskContext ctx = testUtils.create(pageSize);
HashMap<Integer, String> keyValuePair = new HashMap<>();
@@ -90,12 +90,12 @@ public abstract class AbstractRunGeneratorTest {
return runGenerator.getRuns();
}
- static void matchResult(IHyracksTaskContext ctx, List<RunAndMaxFrameSizePair> runs,
+ static void matchResult(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs,
Map<Integer, String> keyValuePair) throws HyracksDataException {
HashMap<Integer, String> copyMap2 = new HashMap<>(keyValuePair);
int maxFrameSizes = 0;
- for (RunAndMaxFrameSizePair run : runs) {
- maxFrameSizes = Math.max(maxFrameSizes, run.maxFrameSize);
+ for (GeneratedRunFileReader run : runs) {
+ maxFrameSizes = Math.max(maxFrameSizes, run.getMaxFrameSize());
}
GroupVSizeFrame gframe = new GroupVSizeFrame(ctx, maxFrameSizes);
GroupFrameAccessor gfta = new GroupFrameAccessor(ctx.getInitialFrameSize(), RecordDesc);
@@ -125,25 +125,25 @@ public abstract class AbstractRunGeneratorTest {
return preKey;
}
- static void assertReadSorted(List<RunAndMaxFrameSizePair> runs, IFrameTupleAccessor fta, IFrame frame,
+ static void assertReadSorted(List<GeneratedRunFileReader> runs, IFrameTupleAccessor fta, IFrame frame,
Map<Integer, String> keyValuePair) throws HyracksDataException {
assertTrue(runs.size() > 0);
- for (RunAndMaxFrameSizePair run : runs) {
- run.run.open();
+ for (GeneratedRunFileReader run : runs) {
+ run.open();
int preKey = Integer.MIN_VALUE;
- while (run.run.nextFrame(frame)) {
+ while (run.nextFrame(frame)) {
fta.reset(frame.getBuffer());
preKey = assertFTADataIsSorted(fta, keyValuePair, preKey);
}
- run.run.close();
+ run.close();
}
assertTrue(keyValuePair.isEmpty());
}
static void prepareData(IHyracksTaskContext ctx, List<IFrame> frameList, int minDataSize, int minRecordSize,
int maxRecordSize, Map<Integer, String> specialData, Map<Integer, String> keyValuePair)
- throws HyracksDataException {
+ throws HyracksDataException {
ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
FrameTupleAppender appender = new FrameTupleAppender();
@@ -223,7 +223,7 @@ public abstract class AbstractRunGeneratorTest {
int numRuns = 2;
int minRecordSize = pageSize / 8;
int maxRecordSize = pageSize / 8;
- List<RunAndMaxFrameSizePair> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
+ List<GeneratedRunFileReader> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
maxRecordSize, null);
assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
}
@@ -235,8 +235,8 @@ public abstract class AbstractRunGeneratorTest {
int numRuns = 2;
int minRecordSize = pageSize;
int maxRecordSize = (int) (pageSize * 1.8);
- List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
- maxRecordSize, null);
+ List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+ null);
assertMaxFrameSizesAreAllEqualsTo(size, pageSize * 2);
}
@@ -248,12 +248,12 @@ public abstract class AbstractRunGeneratorTest {
int minRecordSize = 20;
int maxRecordSize = pageSize / 2;
HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit - 1);
- List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
- maxRecordSize, specialPair);
+ List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+ specialPair);
int max = 0;
- for (RunAndMaxFrameSizePair run : size) {
- max = Math.max(max, run.maxFrameSize);
+ for (GeneratedRunFileReader run : size) {
+ max = Math.max(max, run.getMaxFrameSize());
}
assertTrue(max == pageSize * (frameLimit - 1));
}
@@ -266,8 +266,8 @@ public abstract class AbstractRunGeneratorTest {
HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit);
int minRecordSize = 10;
int maxRecordSize = pageSize / 2;
- List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
- maxRecordSize, specialPair);
+ List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+ specialPair);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
new file mode 100644
index 0000000..f1a4231
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import org.apache.hyracks.dataflow.std.group.ISpillableTableFactory;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupBuildOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupWriteOperatorNodePushable;
+
+public class ExternalHashGroupbyTest extends AbstractExternalGroupbyTest {
+ ExternalGroupBuildOperatorNodePushable buildOperator;
+ ExternalGroupWriteOperatorNodePushable mergeOperator;
+
+ @Override
+ protected void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) {
+ ISpillableTableFactory tableFactory = new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE });
+ buildOperator = new ExternalGroupBuildOperatorNodePushable(ctx, this.hashCode(), tableSize,
+ numFrames * ctx.getInitialFrameSize(), keyFields, numFrames, comparatorFactories,
+ normalizedKeyComputerFactory, partialAggrInPlace, inRecordDesc, outputRec, tableFactory);
+ mergeOperator = new ExternalGroupWriteOperatorNodePushable(ctx, this.hashCode(), tableFactory, outputRec,
+ outputRec, numFrames, keyFieldsAfterPartial, normalizedKeyComputerFactory, comparatorFactories,
+ finalAggrInPlace);
+ }
+
+ @Override
+ protected IFrameWriter getBuilder() {
+ return buildOperator;
+ }
+
+ @Override
+ protected AbstractUnaryOutputSourceOperatorNodePushable getMerger() {
+ return mergeOperator;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
index e6d10f2..567b7df 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -55,10 +55,9 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.sort.Algorithm;
import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
import org.junit.Test;
@@ -234,8 +233,8 @@ public class RunMergingFrameReaderTest {
List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
List<TestFrameReader> readerList = new ArrayList<>(numRuns);
List<IFrame> frameList = new ArrayList<>(numRuns);
- prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize,
- readerList, frameList, keyValueMapList);
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+ frameList, keyValueMapList);
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
Comparators, null, RecordDesc, topK);
@@ -313,8 +312,8 @@ public class RunMergingFrameReaderTest {
int maxRecordSize = pageSize / 2;
IHyracksTaskContext ctx = testUtils.create(pageSize);
- ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null,
- ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT, numFramesPerRun);
+ ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories,
+ RecordDesc, Algorithm.MERGE_SORT, numFramesPerRun);
runGenerator.open();
Map<Integer, String> keyValuePair = new HashMap<>();
@@ -336,20 +335,19 @@ public class RunMergingFrameReaderTest {
}
runGenerator.close();
List<IFrame> inFrame = new ArrayList<>(runGenerator.getRuns().size());
- for (RunAndMaxFrameSizePair max : runGenerator.getRuns()) {
- inFrame.add(new GroupVSizeFrame(ctx, max.maxFrameSize));
+ for (GeneratedRunFileReader max : runGenerator.getRuns()) {
+ inFrame.add(new GroupVSizeFrame(ctx, max.getMaxFrameSize()));
}
// Let each run file reader not delete the run file when it is read and closed.
- for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
- RunFileReader runFileReader = (RunFileReader) run.run;
- PA.setValue(runFileReader, "deleteAfterClose", false);
+ for (GeneratedRunFileReader run : runGenerator.getRuns()) {
+ PA.setValue(run, "deleteAfterClose", false);
}
matchResult(ctx, runGenerator.getRuns(), keyValuePair);
List<IFrameReader> runs = new ArrayList<>();
- for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
- runs.add(run.run);
+ for (GeneratedRunFileReader run : runGenerator.getRuns()) {
+ runs.add(run);
}
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null,
RecordDesc);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
new file mode 100644
index 0000000..bcf661f
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.group.sort.ExternalSortGroupByRunGenerator;
+import org.apache.hyracks.dataflow.std.group.sort.ExternalSortGroupByRunMerger;
+import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
+import org.apache.hyracks.dataflow.std.sort.Algorithm;
+import org.apache.hyracks.dataflow.std.sort.ISorter;
+
+public class SortGroupbyTest extends AbstractExternalGroupbyTest {
+ ExternalSortGroupByRunGenerator builder;
+
+ IOperatorNodePushable mergerOperator;
+
+ @Override
+ protected void initial(final IHyracksTaskContext ctx, int tableSize, final int numFrames)
+ throws HyracksDataException {
+ builder = new ExternalSortGroupByRunGenerator(ctx, keyFields, inRecordDesc, numFrames, keyFields,
+ normalizedKeyComputerFactory, comparatorFactories, partialAggrInState, outputRec, Algorithm.QUICK_SORT);
+
+ mergerOperator = new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ List<GeneratedRunFileReader> runs = builder.getRuns();
+ ISorter sorter = builder.getSorter();
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ INormalizedKeyComputer nmkComputer = normalizedKeyComputerFactory == null ? null
+ : normalizedKeyComputerFactory.createNormalizedKeyComputer();
+ AbstractExternalSortRunMerger merger = new ExternalSortGroupByRunMerger(ctx, sorter, runs, keyFields,
+ inRecordDesc, outputRec, outputRec, numFrames, writer, keyFields, nmkComputer, comparators,
+ partialAggrInState, finalAggrInState, true);
+ merger.process();
+ }
+ };
+ }
+
+ @Override
+ protected IFrameWriter getBuilder() {
+ return builder;
+ }
+
+ @Override
+ protected IOperatorNodePushable getMerger() {
+ return mergerOperator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/text-example/textclient/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/text-example/textclient/pom.xml b/hyracks/hyracks-examples/text-example/textclient/pom.xml
index 3bf69e2..a88c421 100644
--- a/hyracks/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textclient/pom.xml
@@ -67,24 +67,7 @@
<goal>assemble</goal>
</goals>
</execution>
- <execution>
- <id>groupclient</id>
- <configuration>
- <programs>
- <program>
- <mainClass>org.apache.hyracks.examples.text.client.ExternalGroupClient</mainClass>
- <name>groupclient</name>
- </program>
- </programs>
- <repositoryLayout>flat</repositoryLayout>
- <repositoryName>lib</repositoryName>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>assemble</goal>
- </goals>
- </execution>
- </executions>
+ </executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java
deleted file mode 100644
index 1ae3258..0000000
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.examples.text.client;
-
-import java.io.File;
-
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-
-/**
- * The application client for the performance tests of the external hash group
- * operator.
- */
-public class ExternalGroupClient {
- private static class Options {
- @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
- public String host;
-
- @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
- public int port = 1098;
-
- @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
- public String inFileSplits;
-
- @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
- public String outFileSplits;
-
- @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
- public int htSize = 8191;
-
- @Option(name = "-frame-size", usage = "Frame size (default: 32768)", required = false)
- public int frameSize = 32768;
-
- @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 512)", required = false)
- public int sbSize = 512;
-
- @Option(name = "-sort-output", usage = "Whether to sort the output (default: true)", required = false)
- public boolean sortOutput = false;
-
- @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
- public boolean outPlain = true;
-
- @Option(name = "-algo", usage = "The algorithm to be used", required = true)
- public int algo;
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- parser.parseArgument(args);
-
- IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
-
- JobSpecification job;
-
- for (int i = 0; i < 6; i++) {
- long start = System.currentTimeMillis();
- job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i),
- options.htSize, options.sbSize, options.frameSize, options.sortOutput, options.algo,
- options.outPlain);
-
- System.out.print(i + "\t" + (System.currentTimeMillis() - start));
- start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(job);
- hcc.waitForCompletion(jobId);
- System.out.println("\t" + (System.currentTimeMillis() - start));
- }
- }
-
- private static FileSplit[] parseFileSplits(String fileSplits) {
- String[] splits = fileSplits.split(",");
- FileSplit[] fSplits = new FileSplit[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- String s = splits[i].trim();
- int idx = s.indexOf(':');
- if (idx < 0) {
- throw new IllegalArgumentException("File split " + s + " not well formed");
- }
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
- }
- return fSplits;
- }
-
- private static FileSplit[] parseFileSplits(String fileSplits, int count) {
- String[] splits = fileSplits.split(",");
- FileSplit[] fSplits = new FileSplit[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- String s = splits[i].trim();
- int idx = s.indexOf(':');
- if (idx < 0) {
- throw new IllegalArgumentException("File split " + s + " not well formed");
- }
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
- + count)));
- }
- return fSplits;
- }
-
- private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
- int frameSize, boolean sortOutput, int alg, boolean outPlain) {
- JobSpecification spec = new JobSpecification(frameSize);
- IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
-
- RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|'), inDesc);
-
- createPartitionConstraint(spec, fileScanner, inSplits);
-
- // Output: each unique string with an integer count
- RecordDescriptor outDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE,
- // IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
-
- // Specify the grouping key, which will be the string extracted during
- // the scan.
- int[] keys = new int[] { 0,
- // 1
- };
-
- AbstractOperatorDescriptor grouper;
-
- switch (alg) {
- case 0: // new external hash graph
- grouper = new org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
- keys, frameSize, new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(keys.length,
- false) }), outDesc, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }), htSize), false);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect scanner with the grouper
- IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
-
- break;
- case 1: // External-sort + new-precluster
- ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, frameSize, keys,
- new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, inDesc);
- createPartitionConstraint(spec, sorter2, inSplits);
-
- // Connect scan operator with the sorter
- IConnectorDescriptor scanSortConn2 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
-
- grouper = new org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor(
- spec, keys, new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- outDesc);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect sorter with the pre-cluster
- OneToOneConnectorDescriptor sortGroupConn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(sortGroupConn2, sorter2, 0, grouper, 0);
- break;
- case 2: // Inmem
- grouper = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }),
- new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- outDesc, htSize);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect scanner with the grouper
- IConnectorDescriptor scanConn2 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanConn2, fileScanner, 0, grouper, 0);
- break;
- default:
- grouper = new org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
- keys, frameSize, new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(keys.length,
- false) }), outDesc, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }), htSize), false);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect scanner with the grouper
- IConnectorDescriptor scanGroupConnDef = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
- }
-
- IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
-
- AbstractSingleActivityOperatorDescriptor writer;
-
- if (outPlain)
- writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
- else
- writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
-
- createPartitionConstraint(spec, writer, outSplits);
-
- IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(groupOutConn, grouper, 0, writer, 0);
-
- spec.addRoot(writer);
- return spec;
- }
-
- private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
- String[] parts = new String[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- parts[i] = splits[i].getNodeName();
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
- }
-}