You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:31 UTC
[14/14] incubator-asterixdb-hyracks git commit:
VariableSizeFrame(VSizeFrame) support for Hyracks.
VariableSizeFrame(VSizeFrame) support for Hyracks.
This patch replaced Frame/Accessor/Appender with the new API which
supports BigObject.
The ExternalSorter/TopKSorter/ExternalGroupSorter
have been implemented to support big object.
The Groupby && Join should work with BigObject also. But it will break the
memory budget when it encounter a big object. I will fix the memory
problem later in a separate CR.
The design about the frame allocation is
here:https://docs.google.com/presentation/d/15h9iQf5OYsgGZoQTbGHkj1yS2G9q2fd0s1lDAD1EJq0/edit?usp=sharing
Suggest review order:
Patch 12: It includes all of the sorting operators.
Patch 13: It applys the new IFrame API to all Hyracks codes.
Patch 14: Some bug fixes to pass all Asterix's tests.
Patch 15: Skip it!
Patch 16: Some bug fixes to the Asterix's tests in small frame setting.
Later Patch: address the comments
Change-Id: I2e08692078683f6f2cf17387e39037ad851fc05b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/234
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <bu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/0d87a57f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/0d87a57f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/0d87a57f
Branch: refs/heads/master
Commit: 0d87a57f7439eca69e6dae73f117747b4ea51746
Parents: 0e5d531
Author: JavierJia <ji...@gmail.com>
Authored: Wed Jun 17 20:07:38 2015 -0700
Committer: Yingyi Bu <bu...@gmail.com>
Committed: Wed Jun 17 20:45:14 2015 -0700
----------------------------------------------------------------------
...estedPlansAccumulatingAggregatorFactory.java | 6 +-
.../NestedPlansRunningAggregatorFactory.java | 27 +-
...eInputOneOutputOneFieldFramePushRuntime.java | 41 ++
...actOneInputOneOutputOneFramePushRuntime.java | 96 +--
...AbstractOneInputOneOutputRuntimeFactory.java | 2 +-
.../MicroPreClusteredGroupRuntimeFactory.java | 61 +-
.../operators/meta/SubplanRuntimeFactory.java | 14 +-
.../sort/InMemorySortRuntimeFactory.java | 13 +-
.../std/EmptyTupleSourceRuntimeFactory.java | 10 +-
.../std/NestedTupleSourceRuntimeFactory.java | 4 +-
.../PartitioningSplitOperatorDescriptor.java | 28 +-
.../operators/std/SinkWriterRuntime.java | 4 +-
.../std/StreamSelectRuntimeFactory.java | 49 +-
.../ics/hyracks/api/comm/FixedSizeFrame.java | 59 ++
.../ics/hyracks/api/comm/FrameConstants.java | 31 +-
.../uci/ics/hyracks/api/comm/FrameHelper.java | 61 +-
.../edu/uci/ics/hyracks/api/comm/IFrame.java | 60 ++
.../ics/hyracks/api/comm/IFrameAppender.java | 55 ++
.../hyracks/api/comm/IFrameFieldAppender.java | 47 ++
.../uci/ics/hyracks/api/comm/IFrameReader.java | 8 +-
.../hyracks/api/comm/IFrameTupleAccessor.java | 25 +-
.../hyracks/api/comm/IFrameTupleAppender.java | 39 +
.../uci/ics/hyracks/api/comm/IFrameWriter.java | 11 +-
.../hyracks/api/comm/NoShrinkVSizeFrame.java | 31 +
.../uci/ics/hyracks/api/comm/VSizeFrame.java | 76 ++
.../api/context/IHyracksCommonContext.java | 10 +-
.../api/context/IHyracksFrameMgrContext.java | 39 +
.../api/dataset/IHyracksDatasetReader.java | 5 +-
.../client/dataset/DatasetClientContext.java | 21 +-
.../client/dataset/HyracksDatasetReader.java | 27 +-
.../comm/channels/NetworkInputChannel.java | 2 +-
.../comm/channels/NetworkOutputChannel.java | 52 +-
.../comm/channels/ReadBufferFactory.java | 2 +-
.../edu/uci/ics/hyracks/control/nc/Joblet.java | 35 +-
.../edu/uci/ics/hyracks/control/nc/Task.java | 24 +-
.../nc/dataset/DatasetPartitionWriter.java | 2 +-
.../hyracks/control/nc/dataset/ResultState.java | 3 +
.../ics/hyracks/control/nc/io/IOManager.java | 11 +-
.../MaterializedPartitionInputChannel.java | 3 +
.../control/nc/partitions/PartitionManager.java | 4 +-
.../ConnectorReceiverProfilingFrameReader.java | 7 +-
.../nc/resources/memory/FrameManager.java | 85 +++
hyracks/hyracks-dataflow-common/pom.xml | 6 +
.../common/comm/io/AbstractFrameAppender.java | 99 +++
.../common/comm/io/ArrayTupleBuilder.java | 1 +
.../dataflow/common/comm/io/FrameConstants.java | 23 -
.../common/comm/io/FrameDeserializer.java | 5 +-
.../comm/io/FrameDeserializingDataReader.java | 19 +-
.../comm/io/FrameDeserializingDataWriter.java | 2 +-
.../common/comm/io/FrameFixedFieldAppender.java | 108 +++
.../comm/io/FrameFixedFieldTupleAppender.java | 130 ++++
.../common/comm/io/FrameOutputStream.java | 24 +-
.../common/comm/io/FrameTupleAccessor.java | 101 ++-
.../common/comm/io/FrameTupleAppender.java | 237 +++---
.../comm/io/FrameTupleAppenderAccessor.java | 131 ++++
.../comm/io/FrameTupleAppenderWrapper.java | 80 +--
.../comm/io/ResultFrameTupleAccessor.java | 72 +-
.../common/comm/io/SerializingDataWriter.java | 32 +-
.../dataflow/common/comm/util/FrameUtils.java | 260 ++++++-
.../dataflow/common/io/RunFileReader.java | 28 +-
.../dataflow/common/util/IntSerDeUtils.java | 6 +
.../FrameFixedFieldTupleAppenderTest.java | 215 ++++++
.../hadoop/HadoopReadOperatorDescriptor.java | 21 +-
.../dataflow/hadoop/mapreduce/HadoopHelper.java | 2 +-
.../dataflow/hadoop/mapreduce/KVIterator.java | 20 +-
.../mapreduce/MapperOperatorDescriptor.java | 50 +-
.../dataflow/hadoop/mapreduce/ReduceWriter.java | 32 +-
.../hadoop/mapreduce/ShuffleFrameReader.java | 83 ++-
hyracks/hyracks-dataflow-std/pom.xml | 6 +
.../std/collectors/InputChannelFrameReader.java | 69 +-
.../NonDeterministicChannelReader.java | 27 +-
.../collectors/NonDeterministicFrameReader.java | 30 +-
.../std/collectors/SortMergeFrameReader.java | 17 +-
.../LocalityAwarePartitionDataWriter.java | 34 +-
.../std/connectors/PartitionDataWriter.java | 42 +-
.../file/DelimitedDataTupleParserFactory.java | 26 +-
.../file/PlainFileWriterOperatorDescriptor.java | 9 +-
.../std/group/HashSpillableTableFactory.java | 63 +-
.../dataflow/std/group/ISpillableTable.java | 4 +-
.../ExternalGroupBuildOperatorNodePushable.java | 2 +-
.../ExternalGroupMergeOperatorNodePushable.java | 63 +-
.../std/group/hash/GroupingHashTable.java | 45 +-
.../HashGroupBuildOperatorNodePushable.java | 2 +-
.../PreclusteredGroupOperatorNodePushable.java | 8 -
.../preclustered/PreclusteredGroupWriter.java | 29 +-
.../sort/ExternalSortGroupByRunGenerator.java | 98 +--
.../sort/ExternalSortGroupByRunMerger.java | 144 +---
.../sort/SortGroupByOperatorDescriptor.java | 255 ++-----
.../join/GraceHashJoinOperatorNodePushable.java | 20 +-
...hJoinPartitionBuildOperatorNodePushable.java | 20 +-
.../join/HybridHashJoinOperatorDescriptor.java | 106 +--
.../dataflow/std/join/InMemoryHashJoin.java | 52 +-
.../InMemoryHashJoinOperatorDescriptor.java | 26 +-
.../dataflow/std/join/NestedLoopJoin.java | 96 +--
.../join/NestedLoopJoinOperatorDescriptor.java | 22 +-
.../std/join/OptimizedHybridHashJoin.java | 122 ++--
...timizedHybridHashJoinOperatorDescriptor.java | 130 ++--
...ConstantTupleSourceOperatorNodePushable.java | 10 +-
.../std/misc/LimitOperatorDescriptor.java | 12 +-
.../std/misc/MaterializerTaskState.java | 8 +-
.../misc/MaterializingOperatorDescriptor.java | 9 +-
.../std/misc/SplitOperatorDescriptor.java | 4 +-
.../result/ResultWriterOperatorDescriptor.java | 19 +-
.../dataflow/std/sort/AbstractFrameSorter.java | 186 +++++
.../std/sort/AbstractSortRunGenerator.java | 77 ++
.../sort/AbstractSorterOperatorDescriptor.java | 197 +++++
.../hyracks/dataflow/std/sort/BSTMemMgr.java | 717 -------------------
.../hyracks/dataflow/std/sort/BSTNodeUtil.java | 233 ------
.../sort/ExternalSortOperatorDescriptor.java | 211 ++----
.../std/sort/ExternalSortRunGenerator.java | 127 ++--
.../std/sort/ExternalSortRunMerger.java | 435 +++++------
.../dataflow/std/sort/FrameSorterMergeSort.java | 172 +----
.../dataflow/std/sort/FrameSorterQuickSort.java | 157 +---
.../dataflow/std/sort/HeapSortRunGenerator.java | 99 +++
.../std/sort/HybridTopKSortRunGenerator.java | 109 +++
.../hyracks/dataflow/std/sort/IFrameSorter.java | 31 +-
.../dataflow/std/sort/IMemoryManager.java | 88 ---
.../dataflow/std/sort/IRunGenerator.java | 3 +-
.../dataflow/std/sort/ISelectionTree.java | 90 ---
.../ics/hyracks/dataflow/std/sort/ISorter.java | 33 +
.../hyracks/dataflow/std/sort/ITupleSorter.java | 26 +
.../sort/InMemorySortOperatorDescriptor.java | 27 +-
...OptimizedExternalSortOperatorDescriptor.java | 218 ------
.../sort/OptimizedExternalSortRunGenerator.java | 283 --------
...imizedExternalSortRunGeneratorWithLimit.java | 436 -----------
.../std/sort/RunAndMaxFrameSizePair.java | 32 +
.../std/sort/RunMergingFrameReader.java | 115 +--
.../uci/ics/hyracks/dataflow/std/sort/Slot.java | 81 ---
.../hyracks/dataflow/std/sort/SortMinHeap.java | 293 --------
.../dataflow/std/sort/SortMinMaxHeap.java | 448 ------------
.../std/sort/TopKSorterOperatorDescriptor.java | 62 ++
.../dataflow/std/sort/TupleSorterHeapSort.java | 269 +++++++
.../sort/buffermanager/EnumFreeSlotPolicy.java | 22 +
.../FrameFreeSlotBiggestFirst.java | 97 +++
.../buffermanager/FrameFreeSlotLastFit.java | 81 +++
.../buffermanager/FrameFreeSlotSmallestFit.java | 59 ++
.../sort/buffermanager/IFrameBufferManager.java | 68 ++
.../buffermanager/IFrameFreeSlotPolicy.java | 44 ++
.../std/sort/buffermanager/IFramePool.java | 48 ++
.../buffermanager/ITupleBufferAccessor.java | 36 +
.../sort/buffermanager/ITupleBufferManager.java | 42 ++
.../VariableFrameMemoryManager.java | 132 ++++
.../sort/buffermanager/VariableFramePool.java | 200 ++++++
.../VariableTupleMemoryManager.java | 203 ++++++
.../sort/util/DeletableFrameTupleAppender.java | 244 +++++++
.../std/sort/util/GroupFrameAccessor.java | 170 +++++
.../dataflow/std/sort/util/GroupVSizeFrame.java | 46 ++
.../IAppendDeletableFrameTupleAccessor.java | 72 ++
.../dataflow/std/structures/AbstractHeap.java | 156 ++++
.../hyracks/dataflow/std/structures/IHeap.java | 44 ++
.../dataflow/std/structures/IMaxHeap.java | 43 ++
.../dataflow/std/structures/IMinHeap.java | 42 ++
.../dataflow/std/structures/IMinMaxHeap.java | 18 +
.../dataflow/std/structures/IResetable.java | 20 +
.../std/structures/IResetableComparable.java | 19 +
.../structures/IResetableComparableFactory.java | 20 +
.../dataflow/std/structures/MaxHeap.java | 63 ++
.../dataflow/std/structures/MinHeap.java | 62 ++
.../dataflow/std/structures/MinMaxHeap.java | 217 ++++++
.../std/structures/SerializableHashTable.java | 2 +-
.../dataflow/std/structures/TuplePointer.java | 43 +-
.../util/DeserializedOperatorNodePushable.java | 2 +-
.../ics/hyracks/dataflow/std/util/MathUtil.java | 50 ++
.../dataflow/std/util/ReferenceEntry.java | 14 +-
.../std/util/ReferencedPriorityQueue.java | 31 +-
.../ics/hyracks/dataflow/std/sort/Utility.java | 23 +
.../dataflow/std/sort/buffermanager/Common.java | 26 +
.../FrameFreeSlotBestFitUsingTreeMapTest.java | 60 ++
.../FrameFreeSlotBiggestFirstTest.java | 70 ++
.../buffermanager/FrameFreeSlotLastFitTest.java | 86 +++
.../buffermanager/VariableFramePoolTest.java | 216 ++++++
.../VariableFramesMemoryManagerTest.java | 170 +++++
.../VariableTupleMemoryManagerTest.java | 230 ++++++
.../util/DeletableFrameTupleAppenderTest.java | 233 ++++++
.../std/structures/AbstracHeapTest.java | 86 +++
.../dataflow/std/structures/MaxHeapTest.java | 99 +++
.../dataflow/std/structures/MinHeapTest.java | 102 +++
.../dataflow/std/structures/MinMaxHeapTest.java | 109 +++
.../ics/hyracks/dataflow/std/util/MathTest.java | 40 ++
.../btree/helper/DataGenOperatorDescriptor.java | 12 +-
.../hyracks-integration-tests/.gitignore | 3 +
.../comm/SerializationDeserializationTest.java | 20 +-
.../integration/AbstractIntegrationTest.java | 37 +-
.../AbstractMultiNCIntegrationTest.java | 21 +-
.../integration/OptimizedSortMergeTest.java | 31 +-
.../integration/VSizeFrameSortMergeTest.java | 118 +++
.../tests/unit/AbstractRunGeneratorTest.java | 279 ++++++++
.../unit/ExternalSortRunGeneratorTest.java | 32 +
.../tests/unit/HeapSortRunGeneratorTest.java | 37 +
.../tests/unit/HybridSortRunGenerator.java | 30 +
.../tests/unit/RunMergingFrameReaderTest.java | 409 +++++++++++
.../tests/unit/TopKRunGeneratorTest.java | 208 ++++++
.../examples/text/WordTupleParserFactory.java | 19 +-
.../tpch-example/tpchclient/pom.xml | 2 +-
.../hyracks/examples/tpch/client/Common.java | 83 +++
.../ics/hyracks/examples/tpch/client/Join.java | 320 +++++++++
.../ics/hyracks/examples/tpch/client/Main.java | 362 ----------
.../ics/hyracks/examples/tpch/client/Sort.java | 165 +++++
.../dataflow/HDFSWriteOperatorDescriptor.java | 2 +-
.../hdfs/lib/TextKeyValueParserFactory.java | 18 +-
.../dataflow/HDFSWriteOperatorDescriptor.java | 2 +-
.../BTreeUpdateSearchOperatorNodePushable.java | 11 +-
.../IndexBulkLoadOperatorNodePushable.java | 2 +-
...xInsertUpdateDeleteOperatorNodePushable.java | 13 +-
.../IndexSearchOperatorNodePushable.java | 34 +-
...eIndexDiskOrderScanOperatorNodePushable.java | 22 +-
.../TreeIndexStatsOperatorNodePushable.java | 13 +-
...xInsertUpdateDeleteOperatorNodePushable.java | 27 +-
.../BinaryTokenizerOperatorNodePushable.java | 31 +-
...InvertedIndexSearchOperatorNodePushable.java | 2 +
.../ondisk/FixedSizeFrameTupleAccessor.java | 10 +
.../ondisk/OnDiskInvertedIndex.java | 12 +-
.../search/AbstractTOccurrenceSearcher.java | 26 +-
.../search/PartitionedTOccurrenceSearcher.java | 9 +-
.../lsm/invertedindex/search/SearchResult.java | 4 +-
.../search/TOccurrenceSearcher.java | 4 +-
.../hyracks/test/support/TestJobletContext.java | 20 +-
.../hyracks/test/support/TestTaskContext.java | 21 +-
.../storage/am/btree/BTreeStatsTest.java | 11 +-
.../storage/am/btree/FieldPrefixNSMTest.java | 11 +-
220 files changed, 10258 insertions(+), 6056 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index af4bff2..618768c 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -50,7 +50,7 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
- final AggregatorOutput outputWriter = new AggregatorOutput(ctx.getFrameSize(), subplans, keyFieldIdx.length,
+ final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length,
decorFieldIdx.length);
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
@@ -173,7 +173,7 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
private ArrayTupleBuilder tb;
private AlgebricksPipeline[] subplans;
- public AggregatorOutput(int frameSize, AlgebricksPipeline[] subplans, int numKeys, int numDecors) {
+ public AggregatorOutput(AlgebricksPipeline[] subplans, int numKeys, int numDecors) {
this.subplans = subplans;
// this.keyFieldIndexes = keyFieldIndexes;
int totalAggFields = 0;
@@ -187,7 +187,7 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
for (int i = 0; i < inputRecDesc.length; i++) {
- tAccess[i] = new FrameTupleAccessor(frameSize, inputRecDesc[i]);
+ tAccess[i] = new FrameTupleAccessor(inputRecDesc[i]);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index c8dc852..b7e736e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -23,6 +23,7 @@ import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -67,10 +68,6 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
- final ByteBuffer outputFrame = ctx.allocateFrame();
- final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputAppender.reset(outputFrame, true);
-
return new IAggregatorDescriptor() {
@Override
@@ -167,7 +164,6 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
private final ArrayTupleBuilder gbyTb;
private final AlgebricksPipeline[] subplans;
private final IFrameWriter outputWriter;
- private final ByteBuffer outputFrame;
private final FrameTupleAppender outputAppender;
public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numKeys,
@@ -188,12 +184,10 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
for (int i = 0; i < inputRecDesc.length; i++) {
- tAccess[i] = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc[i]);
+ tAccess[i] = new FrameTupleAccessor(inputRecDesc[i]);
}
- this.outputFrame = ctx.allocateFrame();
- this.outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- this.outputAppender.reset(outputFrame, true);
+ this.outputAppender = new FrameTupleAppender(new VSizeFrame(ctx));
}
@Override
@@ -221,23 +215,14 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
for (int f = 0; f < w; f++) {
tb.addField(accessor, tIndex, f);
}
- if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(outputFrame, outputWriter);
- outputAppender.reset(outputFrame, true);
- if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException(
- "Failed to write a running aggregation result into an empty frame: possibly the size of the result is too large.");
- }
- }
+ FrameUtils.appendToWriter(outputWriter, outputAppender, tb.getFieldEndOffsets(),
+ tb.getByteArray(), 0, tb.getSize());
}
}
@Override
public void close() throws HyracksDataException {
- if (outputAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outputFrame, outputWriter);
- outputAppender.reset(outputFrame, true);
- }
+ outputAppender.flush(outputWriter, true);
}
public void setInputIdx(int inputIdx) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFieldFramePushRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFieldFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFieldFramePushRuntime.java
new file mode 100644
index 0000000..44c1736
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFieldFramePushRuntime.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameFieldAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public abstract class AbstractOneInputOneOutputOneFieldFramePushRuntime
+ extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+ @Override
+ protected IFrameTupleAppender getTupleAppender() {
+ return (FrameFixedFieldTupleAppender) appender;
+ }
+
+ protected IFrameFieldAppender getFieldAppender() {
+ return (FrameFixedFieldTupleAppender) appender;
+ }
+
+ protected final void initAccessAppendFieldRef(IHyracksTaskContext ctx) throws HyracksDataException {
+ frame = new VSizeFrame(ctx);
+ appender = new FrameFixedFieldTupleAppender(inputRecordDesc.getFieldCount());
+ appender.reset(frame, true);
+ tAccess = new FrameTupleAccessor(inputRecordDesc);
+ tRef = new FrameTupleReference();
+ }
+
+ protected void appendField(byte[] array, int start, int length) throws HyracksDataException {
+ FrameUtils.appendFieldToWriter(writer, getFieldAppender(), array, start, length);
+ }
+
+ protected void appendField(IFrameTupleAccessor accessor, int tid, int fid) throws HyracksDataException {
+ FrameUtils.appendFieldToWriter(writer, getFieldAppender(), accessor, tid, fid);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 81052d6..ec4e039 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -14,8 +14,11 @@
*/
package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
-import java.nio.ByteBuffer;
-
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -26,42 +29,53 @@ import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends AbstractOneInputOneOutputPushRuntime {
- protected FrameTupleAppender appender;
- protected ByteBuffer frame;
+ protected IFrameAppender appender;
+ protected IFrame frame;
protected FrameTupleAccessor tAccess;
protected FrameTupleReference tRef;
+ protected final void initAccessAppend(IHyracksTaskContext ctx) throws HyracksDataException {
+ frame = new VSizeFrame(ctx);
+ appender = new FrameTupleAppender(frame);
+ tAccess = new FrameTupleAccessor(inputRecordDesc);
+ }
+
+ protected final void initAccessAppendRef(IHyracksTaskContext ctx) throws HyracksDataException {
+ initAccessAppend(ctx);
+ tRef = new FrameTupleReference();
+ }
+
@Override
public void close() throws HyracksDataException {
flushIfNotFailed();
writer.close();
- appender.reset(frame, true);
+ }
+
+ protected void flushAndReset() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ appender.flush(writer, true);
+ }
}
protected void flushIfNotFailed() throws HyracksDataException {
if (!failed) {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame, writer);
- }
+ flushAndReset();
}
}
+ protected IFrameTupleAppender getTupleAppender() {
+ return (FrameTupleAppender) appender;
+ }
+
protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb) throws HyracksDataException {
appendToFrameFromTupleBuilder(tb, false);
}
protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb, boolean flushFrame) throws HyracksDataException {
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException(
- "Could not write frame: the size of the tuple is too long to be fit into a single frame. (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder)");
- }
- }
+ FrameUtils.appendToWriter(writer, getTupleAppender(), tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+ tb.getSize());
if (flushFrame) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
+ flushAndReset();
}
}
@@ -71,52 +85,18 @@ public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends Abstr
protected void appendProjectionToFrame(int tIndex, int[] projectionList, boolean flushFrame)
throws HyracksDataException {
- if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
- throw new IllegalStateException(
- "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
- }
- return;
- }
+ FrameUtils.appendProjectionToWriter(writer, getTupleAppender(), tAccess, tIndex, projectionList);
if (flushFrame) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
+ flushAndReset();
}
}
protected void appendTupleToFrame(int tIndex) throws HyracksDataException {
- if (!appender.append(tAccess, tIndex)) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tAccess, tIndex)) {
- throw new IllegalStateException(
- "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendTupleToFrame).");
- }
- }
- }
-
- protected final void initAccessAppend(IHyracksTaskContext ctx) throws HyracksDataException {
- // if (allocFrame) {
- frame = ctx.allocateFrame();
- appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(frame, true);
- // }
- tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
- }
-
- protected final void initAccessAppendRef(IHyracksTaskContext ctx) throws HyracksDataException {
- initAccessAppend(ctx);
- tRef = new FrameTupleReference();
+ FrameUtils.appendToWriter(writer, getTupleAppender(), tAccess, tIndex);
}
- protected final void initAccessAppendFieldRef(IHyracksTaskContext ctx) throws HyracksDataException {
- frame = ctx.allocateFrame();
- appender = new FrameTupleAppender(ctx.getFrameSize(), inputRecordDesc.getFieldCount());
- appender.reset(frame, true);
- tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
- tRef = new FrameTupleReference();
+ protected void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+ throws HyracksDataException {
+ FrameUtils.appendConcatToWriter(writer, getTupleAppender(), accessor0, tIndex0, accessor1, tIndex1);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
index 43270b6..1d31916 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -24,7 +24,7 @@ public abstract class AbstractOneInputOneOutputRuntimeFactory implements IPushRu
private static final long serialVersionUID = 1L;
- protected int[] projectionList;
+ protected final int[] projectionList;
public AbstractOneInputOneOutputRuntimeFactory(int[] projectionList) {
this.projectionList = projectionList;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 2e1171c..3380d08 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -26,7 +26,6 @@ import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
@@ -57,47 +56,37 @@ public class MicroPreClusteredGroupRuntimeFactory extends AbstractOneInputOneOut
@Override
public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws AlgebricksException {
- try {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- final ByteBuffer copyFrame = ctx.allocateFrame();
- final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
- copyFrameAccessor.reset(copyFrame);
- ByteBuffer outFrame = ctx.allocateFrame();
- final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(outFrame, true);
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
- return new AbstractOneInputOneOutputPushRuntime() {
+ return new AbstractOneInputOneOutputPushRuntime() {
- private PreclusteredGroupWriter pgw;
+ private PreclusteredGroupWriter pgw;
- @Override
- public void open() throws HyracksDataException {
- pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
- outRecordDesc, writer);
- pgw.open();
- }
+ @Override
+ public void open() throws HyracksDataException {
+ pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
+ outRecordDesc, writer);
+ pgw.open();
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- pgw.nextFrame(buffer);
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ pgw.nextFrame(buffer);
+ }
- @Override
- public void fail() throws HyracksDataException {
- pgw.fail();
- }
+ @Override
+ public void fail() throws HyracksDataException {
+ pgw.fail();
+ }
- @Override
- public void close() throws HyracksDataException {
- pgw.close();
- }
- };
- } catch (HyracksDataException e) {
- throw new AlgebricksException(e);
- }
+ @Override
+ public void close() throws HyracksDataException {
+ pgw.close();
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index e5bede2..cf40669 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -33,7 +33,6 @@ import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -87,9 +86,7 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto
class TupleOuterProduct implements IFrameWriter {
private boolean smthWasWritten = false;
- private IHyracksTaskContext hCtx = ctx;
- private int frameSize = hCtx.getFrameSize();
- private FrameTupleAccessor ta = new FrameTupleAccessor(frameSize,
+ private FrameTupleAccessor ta = new FrameTupleAccessor(
pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]);
private ArrayTupleBuilder tb = new ArrayTupleBuilder(nullWriters.length);
@@ -103,14 +100,7 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto
ta.reset(buffer);
int nTuple = ta.getTupleCount();
for (int t = 0; t < nTuple; t++) {
- if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
- throw new HyracksDataException(
- "Could not write frame: subplan result is larger than the single-frame limit.");
- }
- }
+ appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t);
}
smthWasWritten = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 54ed192..d3751f5 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -25,6 +25,10 @@ import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorterMergeSort;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -57,7 +61,10 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
@Override
public void open() throws HyracksDataException {
if (frameSorter == null) {
- frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory,
+ IFrameBufferManager manager = new VariableFrameMemoryManager(
+ new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
+ new FrameFreeSlotLastFit());
+ frameSorter = new FrameSorterMergeSort(ctx, manager, sortFields, firstKeyNormalizerFactory,
comparatorFactories, outputRecordDesc);
}
frameSorter.reset();
@@ -76,8 +83,8 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
@Override
public void close() throws HyracksDataException {
- frameSorter.sortFrames();
- frameSorter.flushFrames(writer);
+ frameSorter.sort();
+ frameSorter.flush(writer);
writer.close();
}
};
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 745fdf6..35fcafc 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -14,16 +14,14 @@
*/
package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-import java.nio.ByteBuffer;
-
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSourcePushRuntime;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
@@ -41,18 +39,16 @@ public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException {
return new AbstractOneInputSourcePushRuntime() {
- private ByteBuffer frame = ctx.allocateFrame();
private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
- private FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ private FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
@Override
public void open() throws HyracksDataException {
writer.open();
- appender.reset(frame, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
throw new IllegalStateException();
}
- FrameUtils.flushFrame(frame, writer);
+ appender.flush(writer, true);
writer.close();
}
};
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 55dbcbb..8df87ab 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -21,7 +21,6 @@ import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
@@ -67,8 +66,7 @@ public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
}
public void forceFlush() throws HyracksDataException {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
+ appender.flush(writer, true);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index d19dd34..1d2d27c 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -22,7 +22,9 @@ import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -63,15 +65,15 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
throws HyracksDataException {
return new AbstractUnaryInputOperatorNodePushable() {
private final IFrameWriter[] writers = new IFrameWriter[outputArity];
- private final ByteBuffer[] writeBuffers = new ByteBuffer[outputArity];
+ private final IFrame[] writeBuffers = new IFrame[outputArity];
private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
0);
- private final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inOutRecDesc);
+ private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc);
private final FrameTupleReference frameTuple = new FrameTupleReference();
- private final FrameTupleAppender tupleAppender = new FrameTupleAppender(ctx.getFrameSize());
+ private final FrameTupleAppender tupleAppender = new FrameTupleAppender();
private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
private final DataOutput tupleDos = tupleBuilder.getDataOutput();
@@ -80,9 +82,8 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
// Flush (possibly not full) buffers that have data, and close writers.
for (int i = 0; i < outputArity; i++) {
tupleAppender.reset(writeBuffers[i], false);
- if (tupleAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffers[i], writers[i]);
- }
+ // ? by JF why didn't clear the buffer ?
+ tupleAppender.flush(writers[i], false);
writers[i].close();
}
}
@@ -133,17 +134,8 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
} catch (IOException e) {
throw new HyracksDataException(e);
}
- // Append to frame.
- tupleAppender.reset(writeBuffers[outputIndex], false);
- if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(writeBuffers[outputIndex], writers[outputIndex]);
- tupleAppender.reset(writeBuffers[outputIndex], true);
- if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException();
- }
- }
+ FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
}
@Override
@@ -153,7 +145,7 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
}
// Create write buffers.
for (int i = 0; i < outputArity; i++) {
- writeBuffers[i] = ctx.allocateFrame();
+ writeBuffers[i] = new VSizeFrame(ctx);
// Make sure to clear all buffers, since we are reusing the tupleAppender.
tupleAppender.reset(writeBuffers[i], true);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index 37ad4a9..d26d090 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -41,7 +41,7 @@ public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
this.ctx = ctx;
this.printStream = printStream;
this.inputRecordDesc = inputRecordDesc;
- this.tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
+ this.tAccess = new FrameTupleAccessor(inputRecordDesc);
}
public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
@@ -54,7 +54,7 @@ public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
public void open() throws HyracksDataException {
if (first) {
first = false;
- tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
+ tAccess = new FrameTupleAccessor(inputRecordDesc);
try {
writer.init();
} catch (AlgebricksException e) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 17603e7..4ed33f4 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -22,6 +22,7 @@ import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFieldFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -31,26 +32,24 @@ import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
private static final long serialVersionUID = 1L;
- private IScalarEvaluatorFactory cond;
+ private final IScalarEvaluatorFactory cond;
- private IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+ private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
- private boolean retainNull;
+ private final boolean retainNull;
- private int nullPlaceholderVariableIndex;
+ private final int nullPlaceholderVariableIndex;
- private INullWriterFactory nullWriterFactory;
+ private final INullWriterFactory nullWriterFactory;
/**
* @param cond
- * @param projectionList
- * if projectionList is null, then no projection is performed
+ * @param projectionList if projectionList is null, then no projection is performed
* @param retainNull
* @param nullPlaceholderVariableIndex
* @param nullWriterFactory
@@ -75,7 +74,7 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
@Override
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
final IBinaryBooleanInspector bbi = binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx);
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
+ return new AbstractOneInputOneOutputOneFieldFramePushRuntime() {
private IPointable p = VoidPointable.FACTORY.createPointable();
private IScalarEvaluator eval;
private INullWriter nullWriter = null;
@@ -122,35 +121,11 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
}
} else {
if (retainNull) {
- //keep all field values as is except setting nullPlaceholderVariable field to null
- int i = 0;
- int tryCount = 0;
- while (true) {
- for (i = 0; i < tRef.getFieldCount(); i++) {
- if (i == nullPlaceholderVariableIndex) {
- if (!appender.appendField(nullTupleBuilder.getByteArray(), 0,
- nullTupleBuilder.getSize())) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- break;
- }
- } else {
- if (!appender.appendField(tAccess, t, i)) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- break;
- }
- }
- }
-
- if (i == tRef.getFieldCount()) {
- break;
+ for (int i = 0; i < tRef.getFieldCount(); i++) {
+ if (i == nullPlaceholderVariableIndex) {
+ appendField(nullTupleBuilder.getByteArray(), 0, nullTupleBuilder.getSize());
} else {
- tryCount++;
- if (tryCount == 2) {
- throw new IllegalStateException(
- "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime).");
- }
+ appendField(tAccess, t, i);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FixedSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FixedSizeFrame.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FixedSizeFrame.java
new file mode 100644
index 0000000..ee92084
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FixedSizeFrame.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FixedSizeFrame implements IFrame{
+
+ private final ByteBuffer buffer;
+
+ public FixedSizeFrame(ByteBuffer buffer){
+ this.buffer = buffer;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public void ensureFrameSize(int frameSize) throws HyracksDataException {
+ throw new HyracksDataException("FixedSizeFrame doesn't support capacity changes");
+ }
+
+ @Override
+ public void resize(int frameSize) throws HyracksDataException {
+ throw new HyracksDataException("FixedSizeFrame doesn't support capacity changes");
+ }
+
+ @Override
+ public int getFrameSize() {
+ return buffer.capacity();
+ }
+
+ @Override
+ public int getMinSize() {
+ return buffer.capacity() / FrameHelper.deserializeNumOfMinFrame(buffer, 0);
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ buffer.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
index 0dc97bc..176f23e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
@@ -15,9 +15,34 @@
package edu.uci.ics.hyracks.api.comm;
public interface FrameConstants {
- public static final int SIZE_LEN = 4;
+ /**
+ * We use 4bytes to store the tupleCount at the end of the Frame.
+ */
+ int SIZE_LEN = 4;
- public static final boolean DEBUG_FRAME_IO = false;
+ /**
+ * The offset of the frame_count which is one byte indicate how many initial_frames contained in current frame.
+ * The actual frameSize = frame_count * intitialFrameSize(given by user)
+ */
+ int META_DATA_FRAME_COUNT_OFFSET = 0;
+
+ /**
+ * The start offset of the tuple data. The first byte is used to store the frame_count
+ */
+ int TUPLE_START_OFFSET = 1;
+
+ /**
+ * Since we use one byte to store the frame_count, the max frame_count is 255.
+ */
+ int MAX_NUM_MINFRAME = 255;
+
+ /**
+ * Indicate the total size of the meta data.
+ */
+ int META_DATA_LEN = SIZE_LEN + TUPLE_START_OFFSET;
+
+ boolean DEBUG_FRAME_IO = false;
+
+ int FRAME_FIELD_MAGIC = 0x12345678;
- public static final int FRAME_FIELD_MAGIC = 0x12345678;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
index a6774c7..2376d2e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
@@ -14,8 +14,65 @@
*/
package edu.uci.ics.hyracks.api.comm;
+import java.nio.ByteBuffer;
+
public class FrameHelper {
public static int getTupleCountOffset(int frameSize) {
- return frameSize - 4;
+ return frameSize - FrameConstants.SIZE_LEN;
+ }
+
+ /**
+ * The actual frameSize = frameCount * intitialFrameSize
+ * This method is used to put that frameCount into the first byte of the frame buffer.
+ * @param outputFrame
+ * @param numberOfMinFrame
+ */
+ public static void serializeFrameSize(ByteBuffer outputFrame, byte numberOfMinFrame) {
+ serializeFrameSize(outputFrame, 0, numberOfMinFrame);
+ }
+
+ public static void serializeFrameSize(ByteBuffer outputFrame, int start, byte numberOfMinFrame) {
+ outputFrame.array()[start + FrameConstants.META_DATA_FRAME_COUNT_OFFSET] = (byte) (numberOfMinFrame & 0xff);
+ }
+
+ public static byte deserializeNumOfMinFrame(ByteBuffer frame) {
+ return deserializeNumOfMinFrame(frame, 0);
+ }
+
+ public static byte deserializeNumOfMinFrame(ByteBuffer buffer, int start) {
+ return (byte) (buffer.array()[start + FrameConstants.META_DATA_FRAME_COUNT_OFFSET] & 0xff);
+ }
+
+ /**
+ * Add one tuple requires
+ * 4bytes to store the tuple offset
+ * 4bytes * |fields| to store the relative offset of each field
+ * nbytes the actual data.
+ * If the tupleLength includes the field slot, please set the fieldCount = 0
+ */
+ public static int calcSpaceInFrame(int fieldCount, int tupleLength) {
+ return 4 + fieldCount * 4 + tupleLength;
+ }
+
+ /**
+ * A faster way of calculating the ceiling
+ *
+ * @param fieldCount please set fieldCount to 0 if the tupleLength includes the fields' length
+ * @param tupleLength
+ * @param minFrameSize
+ * @return
+ */
+ public static int calcAlignedFrameSizeToStore(int fieldCount, int tupleLength, int minFrameSize) {
+ assert fieldCount >= 0 && tupleLength >= 0 && minFrameSize > 0;
+ return (1 + (calcSpaceInFrame(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
+ * minFrameSize;
+ }
+
+ public static void clearRemainingFrame(ByteBuffer buffer, int position) {
+ buffer.array()[position] = 0;
+ }
+
+ public static boolean hasBeenCleared(ByteBuffer buffer, int position) {
+ return deserializeNumOfMinFrame(buffer, position) == 0;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrame.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrame.java
new file mode 100644
index 0000000..ccbbb0d
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrame.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrame {
+
+ ByteBuffer getBuffer();
+
+ /**
+ * Make sure the frameSize is bigger or equal to the given size
+ *
+ * @param frameSize
+ * @throws HyracksDataException
+ */
+ void ensureFrameSize(int frameSize) throws HyracksDataException;
+
+ /**
+ *
+ * Expand of shrink the inner buffer to make the size exactly equal to {@code frameSize}
+ * @param frameSize
+ */
+ void resize(int frameSize) throws HyracksDataException;
+
+ /**
+ * Return the size of frame in bytes
+ *
+ * @return
+ */
+ int getFrameSize();
+
+ /**
+ * Return the minimum frame size which should read from the configuration file given by user
+ *
+ * @return
+ */
+ int getMinSize();
+
+ /**
+ * Reset the status of buffer, prepare to the next round of read/write
+ */
+ void reset() throws HyracksDataException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameAppender.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameAppender.java
new file mode 100644
index 0000000..89b2bba
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameAppender.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameAppender {
+ /**
+ * Reset to attach to a new frame.
+ *
+ * @param frame the new frame
+ * @param clear indicate whether we need to clear this new frame
+ * @throws HyracksDataException
+ */
+ void reset(IFrame frame, boolean clear) throws HyracksDataException;
+
+ /**
+ * Get how many tuples in current frame.
+ *
+ * @return
+ */
+ int getTupleCount();
+
+ /**
+ * Get the ByteBuffer which contains the frame data.
+ *
+ * @return
+ */
+ ByteBuffer getBuffer();
+
+ /**
+ * Flush the frame content to the given writer.
+ * Clear the inner buffer after flush if {@code clear} is <code>true</code>.
+ *
+ * @param outWriter the output writer
+ * @param clear indicate whether to clear the inside frame after flushed or not.
+ * @throws HyracksDataException
+ */
+ void flush(IFrameWriter outWriter, boolean clear) throws HyracksDataException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameFieldAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameFieldAppender.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameFieldAppender.java
new file mode 100644
index 0000000..f66248f
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameFieldAppender.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The IFrameFieldAppender is used to append the data into frame field by field.
+ */
+public interface IFrameFieldAppender extends IFrameAppender {
+
+ /**
+ * Append the field stored in {@code bytes} into the current frame.
+ *
+ * @param bytes the byte array that stores the field data
+ * @param offset the offset of the field data
+ * @param length the length of the field data
+ * @return true if the current frame has enough space to hold the field data, otherwise return false.
+ * @throws HyracksDataException
+ */
+ boolean appendField(byte[] bytes, int offset, int length) throws HyracksDataException;
+
+ /**
+ * Append the field of {@code fid} from the tuple {@code tid} whose information is stored in the {@code accessor}
+ * into the current frame.
+ *
+ * @param accessor tupleAccessor
+ * @param tid tuple id in tupleAccessor
+ * @param fid field id of the tuple {@code tid}
+ * @return true if the current frame has enough space to hold the field data, otherwise return false.
+ * @throws HyracksDataException
+ */
+ boolean appendField(IFrameTupleAccessor accessor, int tid, int fid) throws HyracksDataException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
index c72782a..cd3c5ab 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
@@ -14,14 +14,12 @@
*/
package edu.uci.ics.hyracks.api.comm;
-import java.nio.ByteBuffer;
-
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IFrameReader {
- public void open() throws HyracksDataException;
+ void open() throws HyracksDataException;
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException;
+ boolean nextFrame(IFrame frame) throws HyracksDataException;
- public void close() throws HyracksDataException;
+ void close() throws HyracksDataException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
index ee34add..130704f 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
@@ -17,23 +17,28 @@ package edu.uci.ics.hyracks.api.comm;
import java.nio.ByteBuffer;
public interface IFrameTupleAccessor {
- public int getFieldCount();
+ int getFieldCount();
- public int getFieldSlotsLength();
+ int getFieldSlotsLength();
- public int getFieldEndOffset(int tupleIndex, int fIdx);
+ int getFieldEndOffset(int tupleIndex, int fIdx);
- public int getFieldStartOffset(int tupleIndex, int fIdx);
+ int getFieldStartOffset(int tupleIndex, int fIdx);
- public int getFieldLength(int tupleIndex, int fIdx);
+ int getFieldLength(int tupleIndex, int fIdx);
- public int getTupleEndOffset(int tupleIndex);
+ int getTupleLength(int tupleIndex);
- public int getTupleStartOffset(int tupleIndex);
+ int getTupleEndOffset(int tupleIndex);
- public int getTupleCount();
+ int getTupleStartOffset(int tupleIndex);
- public ByteBuffer getBuffer();
+ int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx);
+
+ int getTupleCount();
+
+ ByteBuffer getBuffer();
+
+ void reset(ByteBuffer buffer);
- public void reset(ByteBuffer buffer);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAppender.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAppender.java
new file mode 100644
index 0000000..4da2afc
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAppender.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameTupleAppender extends IFrameAppender {
+
+ boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException;
+
+ boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException;
+
+ boolean append(byte[] bytes, int offset, int length) throws HyracksDataException;
+
+ boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException;
+
+ boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) throws HyracksDataException;
+
+ boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1,
+ int tIndex1) throws HyracksDataException;
+
+ boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1,
+ int offset1, int dataLen1) throws HyracksDataException;
+
+ boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) throws HyracksDataException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
index 8e35dda..538759e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
@@ -45,7 +45,7 @@ import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
* Note: If the call to {@link IFrameWriter#open()} failed, the {@link IFrameWriter#close()} is not called by the producer. So an exceptional
* return from the {@link IFrameWriter#open()} call must clean up all partially
* allocated resources.
- *
+ *
* @author vinayakb
*/
public interface IFrameWriter {
@@ -56,9 +56,8 @@ public interface IFrameWriter {
/**
* Provide data to the stream of this {@link IFrameWriter}.
- *
- * @param buffer
- * - Buffer containing data.
+ *
+ * @param buffer - Buffer containing data.
* @throws HyracksDataException
*/
public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
@@ -66,14 +65,14 @@ public interface IFrameWriter {
/**
* Indicate that a failure was encountered and the current stream is to be
* aborted.
- *
+ *
* @throws HyracksDataException
*/
public void fail() throws HyracksDataException;
/**
* Close this {@link IFrameWriter} and give up all resources.
- *
+ *
* @throws HyracksDataException
*/
public void close() throws HyracksDataException;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NoShrinkVSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NoShrinkVSizeFrame.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NoShrinkVSizeFrame.java
new file mode 100644
index 0000000..902ae75
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NoShrinkVSizeFrame.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import edu.uci.ics.hyracks.api.context.IHyracksFrameMgrContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class NoShrinkVSizeFrame extends VSizeFrame {
+ public NoShrinkVSizeFrame(IHyracksFrameMgrContext ctx) throws HyracksDataException {
+ super(ctx);
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ buffer.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/VSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/VSizeFrame.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/VSizeFrame.java
new file mode 100644
index 0000000..a5a7f19
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/VSizeFrame.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksFrameMgrContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Variable size frame. The buffer inside can be resized.
+ */
+public class VSizeFrame implements IFrame {
+
+ protected final int minFrameSize;
+ protected IHyracksFrameMgrContext ctx;
+ protected ByteBuffer buffer;
+
+ public VSizeFrame(IHyracksFrameMgrContext ctx) throws HyracksDataException {
+ this(ctx, ctx.getInitialFrameSize());
+ }
+
+ public VSizeFrame(IHyracksFrameMgrContext ctx, int frameSize) throws HyracksDataException {
+ this.minFrameSize = ctx.getInitialFrameSize();
+ this.ctx = ctx;
+ buffer = ctx.allocateFrame(frameSize);
+ }
+
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public void ensureFrameSize(int newSize) throws HyracksDataException {
+ if (newSize > getFrameSize()) {
+ buffer = ctx.reallocateFrame(buffer, newSize, true);
+ }
+ }
+
+ @Override
+ public void resize(int frameSize) throws HyracksDataException {
+ if (getFrameSize() != frameSize) {
+ buffer = ctx.reallocateFrame(buffer, frameSize, false);
+ }
+ }
+
+ @Override
+ public int getFrameSize() {
+ return buffer.capacity();
+ }
+
+ @Override
+ public int getMinSize() {
+ return minFrameSize;
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ resize(minFrameSize);
+ buffer.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index eddc4df..d60ff6e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -14,17 +14,9 @@
*/
package edu.uci.ics.hyracks.api.context;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IIOManager;
-public interface IHyracksCommonContext {
- public int getFrameSize();
+public interface IHyracksCommonContext extends IHyracksFrameMgrContext{
public IIOManager getIOManager();
-
- public ByteBuffer allocateFrame() throws HyracksDataException;
-
- public void deallocateFrames(int frameCount);
}