You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2018/10/16 04:18:32 UTC
[02/21] asterixdb git commit: [ASTERIXDB-2286][COMP][FUN][HYR]
Parallel Sort Optimization
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 3335d71..fa11f73 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -52,6 +52,10 @@ import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.api.exceptions.SourceLocation;
+/**
+ * Pre-conditions:
+ * FixReplicateOperatorOutputsRule should be fired
+ */
public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
private final HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childrenToParents =
@@ -62,6 +66,8 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
private final HashMap<Mutable<ILogicalOperator>, MutableInt> clusterMap = new HashMap<>();
private final HashMap<Integer, BitSet> clusterWaitForMap = new HashMap<>();
private int lastUsedClusterId = 0;
+ private final Map<Mutable<ILogicalOperator>, BitSet> replicateToOutputs = new HashMap<>();
+ private final List<Pair<Mutable<ILogicalOperator>, Boolean>> newOutputs = new ArrayList<>();
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
@@ -268,11 +274,71 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
context.computeAndSetTypeEnvironmentForOperator(parentOp);
}
}
+ cleanupPlan();
rewritten = true;
}
return rewritten;
}
+ /**
+ * Cleans up the plan after combining similar branches into one branch making sure parents & children point to
+ * each other correctly.
+ */
+ private void cleanupPlan() {
+ for (Mutable<ILogicalOperator> root : roots) {
+ replicateToOutputs.clear();
+ newOutputs.clear();
+ findReplicateOp(root, replicateToOutputs);
+ cleanup(replicateToOutputs, newOutputs);
+ }
+ }
+
+ /**
+ * Updates the outputs references of a replicate operator to points to the valid parents.
+ * @param replicateToOutputs where the replicate operators are stored with its valid parents.
+ * @param newOutputs the valid parents of replicate operator.
+ */
+ private void cleanup(Map<Mutable<ILogicalOperator>, BitSet> replicateToOutputs,
+ List<Pair<Mutable<ILogicalOperator>, Boolean>> newOutputs) {
+ replicateToOutputs.forEach((repRef, allOutputs) -> {
+ newOutputs.clear();
+ // get the indexes that are set in the BitSet
+ allOutputs.stream().forEach(outIndex -> {
+ newOutputs.add(new Pair<>(((AbstractReplicateOperator) repRef.getValue()).getOutputs().get(outIndex),
+ ((AbstractReplicateOperator) repRef.getValue()).getOutputMaterializationFlags()[outIndex]));
+ });
+ ((AbstractReplicateOperator) repRef.getValue()).setOutputs(newOutputs);
+ });
+ }
+
+ /**
+ * Collects all replicate operator starting from {@param parent} and all its descendants and keeps track of the
+ * valid parents of a replicate operator. The indexes of valid parents will be set in the BitSet.
+ * @param parent the current operator in consideration for which we want to find replicate op children.
+ * @param replicateToOutputs where the replicate operators will be stored with all its parents (valid & invalid).
+ */
+ private void findReplicateOp(Mutable<ILogicalOperator> parent,
+ Map<Mutable<ILogicalOperator>, BitSet> replicateToOutputs) {
+ List<Mutable<ILogicalOperator>> children = parent.getValue().getInputs();
+ for (Mutable<ILogicalOperator> childRef : children) {
+ AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+ if (child.getOperatorTag() == LogicalOperatorTag.REPLICATE
+ || child.getOperatorTag() == LogicalOperatorTag.SPLIT) {
+ AbstractReplicateOperator replicateChild = (AbstractReplicateOperator) child;
+ int parentIndex = replicateChild.getOutputs().indexOf(parent);
+ if (parentIndex >= 0) {
+ BitSet replicateValidOutputs = replicateToOutputs.get(childRef);
+ if (replicateValidOutputs == null) {
+ replicateValidOutputs = new BitSet();
+ replicateToOutputs.put(childRef, replicateValidOutputs);
+ }
+ replicateValidOutputs.set(parentIndex);
+ }
+ }
+ findReplicateOp(childRef, replicateToOutputs);
+ }
+ }
+
private void genCandidates(IOptimizationContext context) throws AlgebricksException {
List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses =
new ArrayList<List<Mutable<ILogicalOperator>>>();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 6967271..5d6237a 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -67,15 +67,18 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
*/
public class InlineVariablesRule implements IAlgebraicRewriteRule {
- // Map of variables that could be replaced by their producing expression.
- // Populated during the top-down sweep of the plan.
- protected Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<>();
- // Visitor for replacing variable reference expressions with their originating expression.
+ // map of variables that could be replaced by their producing expression.
+ // populated during the top-down sweep of the plan.
+ private Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<>();
+ // visitor for replacing variable reference expressions with their originating expression.
protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs);
- // Set of FunctionIdentifiers that we should not inline.
+ // set of FunctionIdentifiers that we should not inline.
protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<>();
- // Indicates whether the rule has been run
- protected boolean hasRun = false;
+ // indicates whether the rule has been run
+ private boolean hasRun = false;
+ // set to prevent re-visiting a subtree from the other sides. Operators with multiple outputs are the ones that
+ // could be re-visited twice or more (e.g. replicate and split operators)
+ private final Map<ILogicalOperator, Boolean> subTreesDone = new HashMap<>();
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
@@ -103,6 +106,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
protected void prepare(IOptimizationContext context) {
varAssignRhs.clear();
inlineVisitor.setContext(context);
+ subTreesDone.clear();
}
protected boolean performBottomUpAction(AbstractLogicalOperator op) throws AlgebricksException {
@@ -118,10 +122,14 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
return false;
}
- protected boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ private boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ // check if you have already visited the subtree rooted at this operator
+ if (subTreesDone.containsKey(op)) {
+ return subTreesDone.get(op);
+ }
// Update mapping from variables to expressions during top-down traversal.
if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
AssignOperator assignOp = (AssignOperator) op;
@@ -183,6 +191,10 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
// Re-enable rules that we may have already tried. They could be applicable now after inlining.
context.removeFromAlreadyCompared(opRef.getValue());
}
+ // mark the subtree rooted at op as visited so that you don't visit it again
+ if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE || op.getOperatorTag() == LogicalOperatorTag.SPLIT) {
+ subTreesDone.put(op, modified);
+ }
return modified;
}
@@ -209,7 +221,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
this.context = context;
}
- public void setOperator(ILogicalOperator op) throws AlgebricksException {
+ public void setOperator(ILogicalOperator op) {
this.op = op;
liveVars.clear();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 4869761..4273553 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -59,6 +59,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.DataSourceS
import org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
@@ -394,6 +395,9 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
op.setPhysicalOperator(new SinkPOperator());
break;
}
+ case FORWARD:
+ op.setPhysicalOperator(new ForwardPOperator());
+ break;
}
}
if (op.hasNestedPlans()) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index 35aa984..6b09894 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -34,10 +34,11 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -246,6 +247,11 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor
}
@Override
+ public ILogicalOperator visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+ return visit(op);
+ }
+
+ @Override
public ILogicalOperator visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
return visit(op);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java
index 0dcc83a..2068ef3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java
@@ -22,5 +22,23 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface ITuplePartitionComputer {
+ /**
+ * For the tuple (located at tIndex in the frame), it determines which target partition (0,1,... nParts-1) the tuple
+ * should be sent/written to.
+ * @param accessor The accessor of the frame to access tuples
+ * @param tIndex The index of the tuple in consideration
+ * @param nParts The number of target partitions
+ * @return The chosen target partition number as dictated by the logic of the partition computer
+ * @throws HyracksDataException
+ */
public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException;
+
+ /**
+ * Gives the data partitioner a chance to set up its environment before it starts partitioning tuples. This method
+ * should be called in the open() of {@link org.apache.hyracks.api.comm.IFrameWriter}. The default implementation
+ * is "do nothing".
+ * @throws HyracksDataException
+ */
+ public default void initialize() throws HyracksDataException {
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
index cde0057..81f9053 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.api.dataflow.value;
import java.io.Serializable;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
public interface ITuplePartitionComputerFactory extends Serializable {
- public ITuplePartitionComputer createPartitioner();
+ public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 09193d9..7d126ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -149,6 +149,9 @@ public class ErrorCode {
public static final int UNDEFINED_INVERTED_LIST_MERGE_TYPE = 113;
public static final int NODE_IS_NOT_ACTIVE = 114;
public static final int LOCAL_NETWORK_ERROR = 115;
+ public static final int ONE_TUPLE_RANGEMAP_EXPECTED = 116;
+ public static final int NO_RANGEMAP_PRODUCED = 117;
+ public static final int RANGEMAP_NOT_FOUND = 118;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index c704d7e..50e92b3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -132,6 +132,9 @@
113 = Undefined inverted-list merge type: %1$s
114 = Node (%1$s) is not active
115 = Local network error
+116 = One tuple rangemap is expected
+117 = No range map produced for parallel sort
+118 = Range map was not found for parallel sort
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java
new file mode 100644
index 0000000..d858a7f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.util;
+
+import java.io.DataInputStream;
+
+public class ByteArrayAccessibleDataInputStream extends DataInputStream {
+
+ public ByteArrayAccessibleDataInputStream(ByteArrayAccessibleInputStream in) {
+ super(in);
+ }
+
+ public ByteArrayAccessibleInputStream getInputStream() {
+ return (ByteArrayAccessibleInputStream) in;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java
new file mode 100644
index 0000000..2785751
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.util;
+
+import java.io.ByteArrayInputStream;
+
+public class ByteArrayAccessibleInputStream extends ByteArrayInputStream {
+
+ public ByteArrayAccessibleInputStream(byte[] buf, int offset, int length) {
+ super(buf, offset, length);
+ }
+
+ public void setContent(byte[] buf, int offset, int length) {
+ this.buf = buf;
+ this.pos = offset;
+ this.count = Math.min(offset + length, buf.length);
+ this.mark = offset;
+ }
+
+ public byte[] getArray() {
+ return buf;
+ }
+
+ public int getPosition() {
+ return pos;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index dc66d19..ab5ab01 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.dataflow.common.data.partition;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -36,7 +37,7 @@ public class FieldHashPartitionComputerFactory implements ITuplePartitionCompute
}
@Override
- public ITuplePartitionComputer createPartitioner() {
+ public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
final IBinaryHashFunction[] hashFunctions = new IBinaryHashFunction[hashFunctionFactories.length];
for (int i = 0; i < hashFunctionFactories.length; ++i) {
hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java
new file mode 100644
index 0000000..e55841a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+
+public class OnePartitionComputerFactory implements ITuplePartitionComputerFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
+ return new ITuplePartitionComputer() {
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
+ return 0;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
index e034af0..63d01fc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.dataflow.common.data.partition;
import java.util.Random;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -30,7 +31,7 @@ public class RandomPartitionComputerFactory implements ITuplePartitionComputerFa
private static final long serialVersionUID = 1L;
@Override
- public ITuplePartitionComputer createPartitioner() {
+ public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
return new ITuplePartitionComputer() {
private final Random random = new Random();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java
index 9cb11fa..1821d78 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.dataflow.common.data.partition;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -35,9 +36,9 @@ public class RepartitionComputerFactory implements ITuplePartitionComputerFactor
}
@Override
- public ITuplePartitionComputer createPartitioner() {
+ public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
return new ITuplePartitionComputer() {
- private ITuplePartitionComputer delegate = delegateFactory.createPartitioner();
+ private ITuplePartitionComputer delegate = delegateFactory.createPartitioner(hyracksTaskContext);
@Override
public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
new file mode 100644
index 0000000..bc642a9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+
+public class DynamicFieldRangePartitionComputerFactory extends FieldRangePartitionComputerFactory {
+ private static final long serialVersionUID = 1L;
+ private final String rangeMapKeyInContext;
+ private final SourceLocation sourceLocation;
+
+ public DynamicFieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
+ String rangeMapKeyInContext, SourceLocation sourceLocation) {
+ super(rangeFields, comparatorFactories);
+ this.rangeMapKeyInContext = rangeMapKeyInContext;
+ this.sourceLocation = sourceLocation;
+ }
+
+ @Override
+ protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) throws HyracksDataException {
+ RangeMap rangeMap = TaskUtil.get(rangeMapKeyInContext, hyracksTaskContext);
+ if (rangeMap == null) {
+ throw HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, sourceLocation);
+ }
+ return rangeMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index d58a248..55d4420 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -19,36 +19,41 @@
package org.apache.hyracks.dataflow.common.data.partition.range;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory {
+public abstract class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory {
private static final long serialVersionUID = 1L;
private final int[] rangeFields;
- private IRangeMap rangeMap;
private IBinaryComparatorFactory[] comparatorFactories;
- public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
- IRangeMap rangeMap) {
+ protected FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories) {
this.rangeFields = rangeFields;
this.comparatorFactories = comparatorFactories;
- this.rangeMap = rangeMap;
}
+ protected abstract RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) throws HyracksDataException;
+
@Override
- public ITuplePartitionComputer createPartitioner() {
+ public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+
return new ITuplePartitionComputer() {
+ private RangeMap rangeMap;
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ rangeMap = getRangeMap(hyracksTaskContext);
+ }
+
@Override
- /**
- * Determine the range partition.
- */
public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
if (nParts == 1) {
return 0;
@@ -62,13 +67,10 @@ public class FieldRangePartitionComputerFactory implements ITuplePartitionComput
return (int) Math.floor(slotIndex / rangesPerPart);
}
- /*
- * Determine the range partition.
- */
- public int getRangePartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ private int getRangePartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
int slotIndex = 0;
- for (int i = 0; i < rangeMap.getSplitCount(); ++i) {
- int c = compareSlotAndFields(accessor, tIndex, i);
+ for (int slotNumber = 0; slotNumber < rangeMap.getSplitCount(); ++slotNumber) {
+ int c = compareSlotAndFields(accessor, tIndex, slotNumber);
if (c < 0) {
return slotIndex;
}
@@ -77,18 +79,18 @@ public class FieldRangePartitionComputerFactory implements ITuplePartitionComput
return slotIndex;
}
- public int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int fieldIndex)
+ private int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int slotNumber)
throws HyracksDataException {
int c = 0;
int startOffset = accessor.getTupleStartOffset(tIndex);
int slotLength = accessor.getFieldSlotsLength();
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = rangeFields[f];
+ for (int fieldNum = 0; fieldNum < comparators.length; ++fieldNum) {
+ int fIdx = rangeFields[fieldNum];
int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
- c = comparators[f].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart,
- fEnd - fStart, rangeMap.getByteArray(fieldIndex, f), rangeMap.getStartOffset(fieldIndex, f),
- rangeMap.getLength(fieldIndex, f));
+ c = comparators[fieldNum].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart,
+ fEnd - fStart, rangeMap.getByteArray(), rangeMap.getStartOffset(fieldNum, slotNumber),
+ rangeMap.getLength(fieldNum, slotNumber));
if (c != 0) {
return c;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
deleted file mode 100644
index 5c5f34b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.common.data.partition.range;
-
-import org.apache.hyracks.data.std.api.IPointable;
-
-public interface IRangeMap {
- public IPointable getFieldSplit(int columnIndex, int splitIndex);
-
- public int getSplitCount();
-
- public byte[] getByteArray(int columnIndex, int splitIndex);
-
- public int getStartOffset(int columnIndex, int splitIndex);
-
- public int getLength(int columnIndex, int splitIndex);
-
- public int getTag(int columnIndex, int splitIndex);
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
index 98acbc0..714e3c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
@@ -19,80 +19,110 @@
package org.apache.hyracks.dataflow.common.data.partition.range;
import java.io.Serializable;
-
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
+import java.util.Arrays;
+import java.util.Objects;
/**
- * The range map stores the field split values in an byte array.
- * The first split value for each field followed by the second split value for each field, etc.
+ * <pre>
+ * The range map stores the fields split values in a byte array.
+ * The first split value for each field followed by the second split value for each field, etc. For example:
+ * split_point_idx0 split_point_idx1 split_point_idx2 split_point_idx3 split_point_idx4
+ * in the byte[]: f0,f1,f2 f0,f1,f2 f0,f1,f2 f0,f1,f2 f0,f1,f2
+ * numFields would be = 3
+ * we have 5 split points, which gives us 6 partitions:
+ * p1 | p2 | p3 | p4 | p5 | p6
+ * sp0 sp1 sp2 sp3 sp4
+ * endOffsets.length would be = 15
+ * </pre>
*/
-public class RangeMap implements IRangeMap, Serializable {
- private final int fields;
+public class RangeMap implements Serializable {
+ private final int numFields;
private final byte[] bytes;
- private final int[] offsets;
+ private final int[] endOffsets;
- public RangeMap(int fields, byte[] bytes, int[] offsets) {
- this.fields = fields;
+ public RangeMap(int numFields, byte[] bytes, int[] endOffsets) {
+ this.numFields = numFields;
this.bytes = bytes;
- this.offsets = offsets;
- }
-
- @Override
- public IPointable getFieldSplit(int columnIndex, int splitIndex) {
- IPointable p = VoidPointable.FACTORY.createPointable();
- int index = getFieldIndex(columnIndex, splitIndex);
- p.set(bytes, getFieldStart(index), getFieldLength(index));
- return p;
+ this.endOffsets = endOffsets;
}
- @Override
public int getSplitCount() {
- return offsets.length / fields;
+ return endOffsets.length / numFields;
}
- @Override
- public byte[] getByteArray(int columnIndex, int splitIndex) {
+ public byte[] getByteArray() {
return bytes;
}
- @Override
- public int getTag(int columnIndex, int splitIndex) {
- return getFieldTag(getFieldIndex(columnIndex, splitIndex));
+ public int getTag(int fieldIndex, int splitIndex) {
+ return getSplitValueTag(getSplitValueIndex(fieldIndex, splitIndex));
}
- @Override
- public int getStartOffset(int columnIndex, int splitIndex) {
- return getFieldStart(getFieldIndex(columnIndex, splitIndex));
+ public int getStartOffset(int fieldIndex, int splitIndex) {
+ return getSplitValueStart(getSplitValueIndex(fieldIndex, splitIndex));
}
- @Override
- public int getLength(int columnIndex, int splitIndex) {
- return getFieldLength(getFieldIndex(columnIndex, splitIndex));
+ public int getLength(int fieldIndex, int splitIndex) {
+ return getSplitValueLength(getSplitValueIndex(fieldIndex, splitIndex));
}
- private int getFieldIndex(int columnIndex, int splitIndex) {
- return splitIndex * fields + columnIndex;
+ /** Translates fieldIndex & splitIndex into an index which is used to find information about that split value.
+ * The combination of a fieldIndex & splitIndex uniquely identifies a split value of interest.
+ * @param fieldIndex the field index within the splitIndex of interest (0 <= fieldIndex < numFields)
+ * @param splitIndex starts with 0,1,2,.. etc
+ * @return the index of the desired split value that could be used with {@code bytes} & {@code endOffsets}.
+ */
+ private int getSplitValueIndex(int fieldIndex, int splitIndex) {
+ return splitIndex * numFields + fieldIndex;
}
- private int getFieldTag(int index) {
- return bytes[getFieldStart(index)];
+ /**
+ * @param splitValueIndex is the combination of the split index + the field index within that split index
+ * @return the type tag of a specific field in a specific split point
+ */
+ private int getSplitValueTag(int splitValueIndex) {
+ return bytes[getSplitValueStart(splitValueIndex)];
}
- private int getFieldStart(int index) {
+ /**
+ * @param splitValueIndex is the combination of the split index + the field index within that split index
+ * @return the location of a split value in the byte array {@code bytes}
+ */
+ private int getSplitValueStart(int splitValueIndex) {
int start = 0;
- if (index != 0) {
- start = offsets[index - 1];
+ if (splitValueIndex != 0) {
+ start = endOffsets[splitValueIndex - 1];
}
return start;
}
- private int getFieldLength(int index) {
- int length = offsets[index];
- if (index != 0) {
- length -= offsets[index - 1];
+ /**
+ * @param splitValueIndex is the combination of the split index + the field index within that split index
+ * @return the length of a split value
+ */
+ private int getSplitValueLength(int splitValueIndex) {
+ int length = endOffsets[splitValueIndex];
+ if (splitValueIndex != 0) {
+ length -= endOffsets[splitValueIndex - 1];
}
return length;
}
+ @Override
+ public int hashCode() {
+ return numFields + Arrays.hashCode(bytes) + Arrays.hashCode(endOffsets);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (!(object instanceof RangeMap)) {
+ return false;
+ }
+ RangeMap other = (RangeMap) object;
+ return numFields == other.numFields && Arrays.equals(endOffsets, other.endOffsets)
+ && Arrays.equals(bytes, other.bytes);
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
new file mode 100644
index 0000000..b17c550
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class StaticFieldRangePartitionComputerFactory extends FieldRangePartitionComputerFactory {
+ private static final long serialVersionUID = 1L;
+ private RangeMap rangeMap;
+
+ public StaticFieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
+ RangeMap rangeMap) {
+ super(rangeFields, comparatorFactories);
+ this.rangeMap = rangeMap;
+ }
+
+ @Override
+ protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) {
+ return rangeMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
index 7d97507..8b57b15 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
@@ -20,7 +20,14 @@ package org.apache.hyracks.dataflow.std.base;
import java.util.BitSet;
+import org.apache.hyracks.api.comm.IPartitionCollector;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
public abstract class AbstractMToNConnectorDescriptor extends AbstractConnectorDescriptor {
private static final long serialVersionUID = 1L;
@@ -47,4 +54,15 @@ public abstract class AbstractMToNConnectorDescriptor extends AbstractConnectorD
public boolean allProducersToAllConsumers() {
return true;
}
+
+ @Override
+ public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+ int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ BitSet expectedPartitions = new BitSet(nProducerPartitions);
+ expectedPartitions.set(0, nProducerPartitions);
+ NonDeterministicChannelReader channelReader =
+ new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions);
+ NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+ return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
index ab553f6..4c728ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
@@ -180,6 +180,7 @@ public abstract class AbstractReplicateOperatorDescriptor extends AbstractOperat
@Override
public void fail() throws HyracksDataException {
+ // TODO: shouldn't we fail the MaterializerTaskState state?
HyracksDataException hde = null;
for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
if (isOpen[i]) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
new file mode 100644
index 0000000..c437619
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.collectors;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hyracks.api.channels.IInputChannel;
+import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.partitions.PartitionId;
+
+public class DeterministicPartitionBatchManager implements IPartitionBatchManager {
+ private final IFrameReader[] partitions;
+ private List<IFrameReader> partitionsList;
+
+ public DeterministicPartitionBatchManager(int nSenders) {
+ this.partitions = new IFrameReader[nSenders];
+ }
+
+ @Override
+ public synchronized void addPartition(PartitionId partitionId, IInputChannel channel) {
+ InputChannelFrameReader channelReader = new InputChannelFrameReader(channel);
+ channel.registerMonitor(channelReader);
+ partitions[partitionId.getSenderIndex()] = channelReader;
+ if (allPartitionsAdded()) {
+ partitionsList = new ArrayList<>(Arrays.asList(partitions));
+ notifyAll();
+ }
+ }
+
+ @Override
+ public synchronized void getNextBatch(List<IFrameReader> batch, int requestedSize) throws HyracksDataException {
+ while (!allPartitionsAdded()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+ if (partitionsList.isEmpty()) {
+ return;
+ }
+ if (requestedSize >= partitionsList.size()) {
+ batch.addAll(partitionsList);
+ partitionsList.clear();
+ } else {
+ List<IFrameReader> subBatch = partitionsList.subList(0, requestedSize);
+ batch.addAll(subBatch);
+ subBatch.clear();
+ }
+ }
+
+ private synchronized boolean allPartitionsAdded() {
+ for (int i = 0; i < partitions.length; i++) {
+ if (partitions[i] == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
new file mode 100644
index 0000000..2646c94
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.collectors;
+
+import java.util.LinkedList;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SequentialMergeFrameReader implements IFrameReader {
+ private final int numSenders;
+ private final IPartitionBatchManager partitionBatchManager;
+ private final LinkedList<IFrameReader> senders;
+ private boolean isOpen;
+
+ public SequentialMergeFrameReader(int numSenders, IPartitionBatchManager partitionBatchManager) {
+ this.numSenders = numSenders;
+ this.partitionBatchManager = partitionBatchManager;
+ this.senders = new LinkedList<>();
+ this.isOpen = false;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (!isOpen) {
+ isOpen = true;
+ // get all the senders and open them one by one
+ partitionBatchManager.getNextBatch(senders, numSenders);
+ for (IFrameReader sender : senders) {
+ sender.open();
+ }
+ }
+ }
+
+ @Override
+ public boolean nextFrame(IFrame outFrame) throws HyracksDataException {
+ IFrameReader currentSender;
+ while (!senders.isEmpty()) {
+ // process the sender at the beginning of the sequence
+ currentSender = senders.getFirst();
+ outFrame.reset();
+ if (currentSender.nextFrame(outFrame)) {
+ return true;
+ } else {
+ // done with the current sender, close it, remove it from the Q and process the next one in sequence
+ currentSender.close();
+ senders.removeFirst();
+ }
+ }
+ // done with all senders
+ return false;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ for (IFrameReader sender : senders) {
+ sender.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
index 920fdb8..0b6e40e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
@@ -61,7 +61,7 @@ public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMT
public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
throws HyracksDataException {
- return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(),
+ return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(ctx),
nConsumerPartitions, localityMap, index);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
index 092b5f1..32618ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
@@ -19,19 +19,14 @@
package org.apache.hyracks.dataflow.std.connectors;
import java.nio.ByteBuffer;
-import java.util.BitSet;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.IPartitionCollector;
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
-import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
public class MToNBroadcastConnectorDescriptor extends AbstractMToNConnectorDescriptor {
@@ -123,15 +118,4 @@ public class MToNBroadcastConnectorDescriptor extends AbstractMToNConnectorDescr
}
};
}
-
- @Override
- public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
- int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
- BitSet expectedPartitions = new BitSet(nProducerPartitions);
- expectedPartitions.set(0, nProducerPartitions);
- NonDeterministicChannelReader channelReader =
- new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions);
- NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
- return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
- }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 02fbedb..c11c08c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -18,10 +18,7 @@
*/
package org.apache.hyracks.dataflow.std.connectors;
-import java.util.BitSet;
-
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.IPartitionCollector;
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -29,9 +26,6 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
-import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
private static final long serialVersionUID = 1L;
@@ -46,18 +40,7 @@ public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDe
public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
throws HyracksDataException {
- return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
- }
-
- @Override
- public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
- int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
- BitSet expectedPartitions = new BitSet(nProducerPartitions);
- expectedPartitions.set(0, nProducerPartitions);
- NonDeterministicChannelReader channelReader =
- new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions);
- NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
- return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
+ return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner(ctx));
}
public ITuplePartitionComputerFactory getTuplePartitionComputerFactory() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
index 026ca5e..e0ec5d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
@@ -70,7 +70,7 @@ public class MToNPartitioningMergingConnectorDescriptor extends AbstractMToNConn
IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
throws HyracksDataException {
final PartitionDataWriter hashWriter =
- new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
+ new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner(ctx));
return hashWriter;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
index f6996f1..af3ce06 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
@@ -45,6 +45,6 @@ public class MToNPartitioningWithMessageConnectorDescriptor extends MToNPartitio
IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
throws HyracksDataException {
return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
- tpcf.createPartitioner());
+ tpcf.createPartitioner(ctx));
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java
new file mode 100644
index 0000000..3decb69
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionCollector;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.OnePartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.collectors.DeterministicPartitionBatchManager;
+import org.apache.hyracks.dataflow.std.collectors.IPartitionBatchManager;
+import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
+import org.apache.hyracks.dataflow.std.collectors.SequentialMergeFrameReader;
+
+public class MToOneSequentialMergingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final ITuplePartitionComputerFactory tpcf;
+
+ public MToOneSequentialMergingConnectorDescriptor(IConnectorDescriptorRegistry spec) {
+ super(spec);
+ tpcf = new OnePartitionComputerFactory();
+ }
+
+ @Override
+ public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+ IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
+ // TODO(ali): create a single partition data writer instead
+ return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner(ctx));
+ }
+
+ @Override
+ public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+ int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ IPartitionBatchManager pbm = new DeterministicPartitionBatchManager(nProducerPartitions);
+ IFrameReader sequentialMergeReader = new SequentialMergeFrameReader(nProducerPartitions, pbm);
+ BitSet expectedPartitions = new BitSet();
+ expectedPartitions.set(0, nProducerPartitions);
+ return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, sequentialMergeReader, pbm);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 5e33275..d06d5d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -113,6 +113,7 @@ public class PartitionDataWriter implements IFrameWriter {
@Override
public void open() throws HyracksDataException {
+ tpc.initialize();
for (int i = 0; i < pWriters.length; ++i) {
isOpen[i] = true;
pWriters[i].open();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index dc250e6..034b054 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -190,7 +190,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(rd1);
private final ITuplePartitionComputer hpcBuild =
- new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner();
+ new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx);
private final FrameTupleAppender appender = new FrameTupleAppender();
private final FrameTupleAppender ftappender = new FrameTupleAppender();
private IFrame[] bufferForPartitions;
@@ -303,9 +303,9 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
}
ITuplePartitionComputer hpc0 =
- new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner();
+ new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner(ctx);
ITuplePartitionComputer hpc1 =
- new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner();
+ new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx);
int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
ISerializableTable table = new SimpleSerializableHashTable(tableSize, ctx);
state.joiner =
@@ -385,7 +385,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories);
private final ITuplePartitionComputerFactory hpcf1 =
new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories);
- private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
+ private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner(ctx);
private final FrameTupleAppender appender = new FrameTupleAppender();
private final FrameTupleAppender ftap = new FrameTupleAppender();
@@ -476,9 +476,9 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
state.joiner.releaseMemory();
}
ITuplePartitionComputer hpcRep0 =
- new RepartitionComputerFactory(state.nPartitions, hpcf0).createPartitioner();
+ new RepartitionComputerFactory(state.nPartitions, hpcf0).createPartitioner(ctx);
ITuplePartitionComputer hpcRep1 =
- new RepartitionComputerFactory(state.nPartitions, hpcf1).createPartitioner();
+ new RepartitionComputerFactory(state.nPartitions, hpcf1).createPartitioner(ctx);
if (state.memoryForHashtable != memsize - 2) {
for (int i = 0; i < state.nPartitions; i++) {
ByteBuffer buf = bufferForPartitions[i].getBuffer();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 3873bae..a5c17f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -182,9 +182,9 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
@Override
public void open() throws HyracksDataException {
ITuplePartitionComputer hpc0 =
- new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner();
+ new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner(ctx);
ITuplePartitionComputer hpc1 =
- new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner();
+ new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx);
state = new HashBuildTaskState(ctx.getJobletContext().getJobId(),
new TaskId(getActivityId(), partition));
ISerializableTable table = new SerializableHashTable(tableSize, ctx, bufferManager);