You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:10 UTC
[20/50] [abbrv] asterixdb git commit: snapshot for range state
transition
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
deleted file mode 100644
index ff2e40b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.common.data.partition.range;
-
-public interface IRangeMap {
-
- public int getSplitCount();
-
- public byte[] getByteArray(int columnIndex, int splitIndex);
-
- public int getStartOffset(int columnIndex, int splitIndex);
-
- public int getLength(int columnIndex, int splitIndex);
-
- public int getTag(int columnIndex, int splitIndex);
-
- // Min value functions
- public byte[] getMinByteArray(int columnIndex);
-
- public int getMinStartOffset(int columnIndex);
-
- public int getMinLength(int columnIndex);
-
- public int getMinTag(int columnIndex);
-
- // Max value functions
- public byte[] getMaxByteArray(int columnIndex);
-
- public int getMaxStartOffset(int columnIndex);
-
- public int getMaxLength(int columnIndex);
-
- public int getMaxTag(int columnIndex);
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java
deleted file mode 100644
index dcde70b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.common.data.partition.range;
-
-public interface IRangePartitionType {
- public enum RangePartitioningType {
- /**
- * Partitioning is determined by finding the range partition where the first data point lies.
- */
- PROJECT,
- /**
- * Partitioning is determined by finding the range partition where the last data point lies.
- */
- PROJECT_END,
- /**
- * Partitioning is determined by finding all the range partitions where the data has a point.
- */
- SPLIT,
- /**
- * Partitioning is determined by finding all the range partitions where the data has a point
- * or comes after the data point.
- */
- REPLICATE
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
index 00fb86d..c15d39a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.dataflow.common.data.partition.range;
import java.io.Serializable;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+
/**
* The range map stores the field split values in an byte array.
* The first and last split values for each column represent the min and max values (not actually split values).
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
new file mode 100644
index 0000000..befaad9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import java.io.Serializable;
+
+/**
+ * Represents a range id in a logical plan.
+ */
+public final class RangeId implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final int id;
+
+ public RangeId(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return "RangeId(#" + id + ")";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof RangeId)) {
+ return false;
+ } else {
+ return id == ((RangeId) obj).getId();
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java
index 068d11a..520ddd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java
@@ -34,24 +34,28 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.base.RangeId;
import org.apache.hyracks.dataflow.std.collectors.IPartitionBatchManager;
import org.apache.hyracks.dataflow.std.collectors.NonDeterministicPartitionBatchManager;
import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
import org.apache.hyracks.dataflow.std.collectors.SortMergeFrameReader;
+import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
public class MToNRangePartitionMergingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
private static final long serialVersionUID = 1L;
private final ITupleRangePartitionComputerFactory tprcf;
+ private final RangeId rangeId;
private final int[] sortFields;
private final IBinaryComparatorFactory[] comparatorFactories;
private final INormalizedKeyComputerFactory nkcFactory;
public MToNRangePartitionMergingConnectorDescriptor(IConnectorDescriptorRegistry spec,
- ITupleRangePartitionComputerFactory tprcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
- INormalizedKeyComputerFactory nkcFactory) {
+ ITupleRangePartitionComputerFactory tprcf, RangeId rangeId, int[] sortFields,
+ IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nkcFactory) {
super(spec);
this.tprcf = tprcf;
+ this.rangeId = rangeId;
this.sortFields = sortFields;
this.comparatorFactories = comparatorFactories;
this.nkcFactory = nkcFactory;
@@ -61,9 +65,7 @@ public class MToNRangePartitionMergingConnectorDescriptor extends AbstractMToNCo
public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
throws HyracksDataException {
- final PartitionRangeDataWriter rangeWriter = new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory,
- recordDesc, tprcf.createPartitioner());
- return rangeWriter;
+ return new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tprcf, rangeId);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
index 2338993..6c83abb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
@@ -34,6 +34,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.base.RangeId;
import org.apache.hyracks.dataflow.std.collectors.IPartitionBatchManager;
import org.apache.hyracks.dataflow.std.collectors.NonDeterministicPartitionBatchManager;
import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
@@ -43,15 +44,17 @@ public class MToNRangePartitioningConnectorDescriptor extends AbstractMToNConnec
private static final long serialVersionUID = 1L;
private final ITupleRangePartitionComputerFactory trpcf;
+ private final RangeId rangeId;
private final int[] sortFields;
private final IBinaryComparatorFactory[] comparatorFactories;
private final INormalizedKeyComputerFactory nkcFactory;
public MToNRangePartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec,
- ITupleRangePartitionComputerFactory trpcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
- INormalizedKeyComputerFactory nkcFactory) {
+ ITupleRangePartitionComputerFactory trpcf, RangeId rangeId, int[] sortFields,
+ IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nkcFactory) {
super(spec);
this.trpcf = trpcf;
+ this.rangeId = rangeId;
this.sortFields = sortFields;
this.comparatorFactories = comparatorFactories;
this.nkcFactory = nkcFactory;
@@ -61,9 +64,7 @@ public class MToNRangePartitioningConnectorDescriptor extends AbstractMToNConnec
public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
throws HyracksDataException {
- final PartitionRangeDataWriter rangeWriter = new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory,
- recordDesc, trpcf.createPartitioner());
- return rangeWriter;
+ return new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, trpcf, rangeId);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
index f8240b8..2740a60 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
@@ -23,25 +23,38 @@ import java.nio.ByteBuffer;
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.storage.IGrowableIntArray;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.base.RangeId;
+import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
import org.apache.hyracks.storage.common.arraylist.IntArrayList;
public class PartitionRangeDataWriter extends AbstractPartitionDataWriter {
- private final ITupleRangePartitionComputer tpc;
+ private final ITupleRangePartitionComputerFactory trpcf;
+ private final RangeId rangeId;
private final IGrowableIntArray map;
+ private ITupleRangePartitionComputer tpc;
public PartitionRangeDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount,
- IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITupleRangePartitionComputer tpc)
- throws HyracksDataException {
+ IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor,
+ ITupleRangePartitionComputerFactory trpcf, RangeId rangeId) throws HyracksDataException {
super(ctx, consumerPartitionCount, pwFactory, recordDescriptor);
- this.tpc = tpc;
+ this.trpcf = trpcf;
+ this.rangeId = rangeId;
this.map = new IntArrayList(8, 8);
}
@Override
+ public void open() throws HyracksDataException {
+ super.open();
+ RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId);
+ tpc = trpcf.createPartitioner(rangeState.getRangeMap());
+ }
+
+ @Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (!allocatedFrame) {
allocateFrames();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
index 1087cf5..850bf56 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
@@ -20,12 +20,13 @@ package org.apache.hyracks.dataflow.std.join;
import java.io.Serializable;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
public interface IMergeJoinCheckerFactory extends Serializable {
- IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition) throws HyracksDataException;
+ IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) throws HyracksDataException;
RangePartitioningType getLeftPartitioningType();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
index 649247e..6f0b33b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
@@ -203,7 +203,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
locks.setPartitions(nPartitions);
RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IMergeJoinChecker mjc = mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys,
- partition);
+ partition, null);
return new RightDataOperator(ctx, partition, inRecordDesc, mjc);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
index 7ca2542..15df580 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
@@ -20,8 +20,9 @@ package org.apache.hyracks.dataflow.std.join;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
public class NaturalMergeJoinCheckerFactory implements IMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@@ -32,7 +33,7 @@ public class NaturalMergeJoinCheckerFactory implements IMergeJoinCheckerFactory
}
@Override
- public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition) {
+ public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
index 15d91be..067246d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
@@ -19,30 +19,23 @@
package org.apache.hyracks.dataflow.std.misc;
import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.sort.ISorter;
+import org.apache.hyracks.dataflow.std.base.RangeId;
public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -50,11 +43,15 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor {
private static final int RANGE_FORWARD_ACTIVITY_ID = 0;
private static final int RANGE_WRITER_ACTIVITY_ID = 1;
+ private final RangeId rangeId;
private final IRangeMap rangeMap;
- public RangeForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, IRangeMap rangeMap) {
+ public RangeForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, RangeId rangeId, IRangeMap rangeMap,
+ RecordDescriptor recordDescriptor) {
super(spec, 1, 1);
+ this.rangeId = rangeId;
this.rangeMap = rangeMap;
+ recordDescriptors[0] = recordDescriptor;
}
@Override
@@ -68,8 +65,8 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor {
public static class RangeForwardTaskState extends AbstractStateObject {
private IRangeMap rangeMap;
- public RangeForwardTaskState(JobId jobId, TaskId taskId, IRangeMap rangeMap) {
- super(jobId, taskId);
+ public RangeForwardTaskState(JobId jobId, RangeId rangeId, IRangeMap rangeMap) {
+ super(jobId, rangeId);
this.rangeMap = rangeMap;
}
@@ -93,8 +90,7 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor {
@Override
public void open() throws HyracksDataException {
- state = new RangeForwardTaskState(ctx.getJobletContext().getJobId(),
- new TaskId(getActivityId(), partition), rangeMap);
+ state = new RangeForwardTaskState(ctx.getJobletContext().getJobId(), rangeId, rangeMap);
ctx.setStateObject(state);
writer.open();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
index c3adc20..273d5ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -19,9 +19,6 @@
package org.apache.hyracks.dataflow.std.sort;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.logging.Level;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java
index 2cf166b..637e195 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java
@@ -32,10 +32,12 @@ import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer;
import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.storage.IGrowableIntArray;
import org.apache.hyracks.data.std.accessors.PointableBinaryRangeAscComparatorFactory;
@@ -48,8 +50,6 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
import org.apache.hyracks.storage.common.arraylist.IntArrayList;
import org.apache.hyracks.test.support.TestUtils;
@@ -143,8 +143,8 @@ public class FieldRangePartitionComputerFactoryTest extends TestCase {
IHyracksTaskContext ctx = TestUtils.create(FRAME_SIZE);
int[] rangeFields = new int[] { 0 };
ITupleRangePartitionComputerFactory frpcf = new FieldRangePartitionComputerFactory(rangeFields,
- comparatorFactories, rangeMap, rangeType);
- ITupleRangePartitionComputer partitioner = frpcf.createPartitioner();
+ comparatorFactories, rangeType);
+ ITupleRangePartitionComputer partitioner = frpcf.createPartitioner(rangeMap);
IFrameTupleAccessor accessor = new FrameTupleAccessor(RecordDesc);
ByteBuffer buffer = prepareData(ctx, integers);