You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:28 UTC
[11/14] incubator-asterixdb-hyracks git commit:
VariableSizeFrame(VSizeFrame) support for Hyracks.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
index 8877df4..ccf7276 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
@@ -26,7 +26,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -43,7 +45,7 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
private final int[] groupFields;
private final FrameTupleAccessor accessor0;
private final FrameTupleAccessor accessor1;
- private final ByteBuffer copyFrame;
+ private final IFrame copyFrame;
private final IBinaryComparator[] comparators;
private final KVIterator kvi;
private final Reducer<K2, V2, K3, V3> reducer;
@@ -53,7 +55,7 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
private boolean first;
private boolean groupStarted;
- private List<ByteBuffer> group;
+ private List<IFrame> group;
private int bPtr;
private FrameTupleAppender fta;
private Counter keyCounter;
@@ -66,10 +68,10 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
this.ctx = ctx;
this.helper = helper;
this.groupFields = groupFields;
- accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- copyFrame = ctx.allocateFrame();
- accessor1.reset(copyFrame);
+ accessor0 = new FrameTupleAccessor(recordDescriptor);
+ accessor1 = new FrameTupleAccessor(recordDescriptor);
+ copyFrame = new VSizeFrame(ctx);
+ accessor1.reset(copyFrame.getBuffer());
comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -79,17 +81,17 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
this.taId = taId;
this.taskAttemptContext = taskAttemptContext;
- kvi = new KVIterator(ctx, helper, recordDescriptor);
+ kvi = new KVIterator(helper, recordDescriptor);
}
@Override
public void open() throws HyracksDataException {
first = true;
groupStarted = false;
- group = new ArrayList<ByteBuffer>();
+ group = new ArrayList<>();
bPtr = 0;
- group.add(ctx.allocateFrame());
- fta = new FrameTupleAppender(ctx.getFrameSize());
+ group.add(new VSizeFrame(ctx));
+ fta = new FrameTupleAppender();
keyCounter = new GenericCounter();
valueCounter = new GenericCounter();
}
@@ -104,6 +106,7 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
first = false;
} else {
if (i == 0) {
+ accessor1.reset(copyFrame.getBuffer());
switchGroupIfRequired(accessor1, accessor1.getTupleCount() - 1, accessor0, i);
} else {
switchGroupIfRequired(accessor0, i - 1, accessor0, i);
@@ -111,20 +114,21 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
}
accumulate(accessor0, i);
}
- FrameUtils.copy(buffer, copyFrame);
+ copyFrame.ensureFrameSize(buffer.capacity());
+ FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
}
private void accumulate(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
if (!fta.append(accessor, tIndex)) {
++bPtr;
if (group.size() <= bPtr) {
- group.add(ctx.allocateFrame());
+ group.add(new VSizeFrame(ctx));
}
fta.reset(group.get(bPtr), true);
if (!fta.append(accessor, tIndex)) {
throw new HyracksDataException("Record size ("
+ (accessor.getTupleEndOffset(tIndex) - accessor.getTupleStartOffset(tIndex))
- + ") larger than frame size (" + group.get(bPtr).capacity() + ")");
+ + ") larger than frame size (" + group.get(bPtr).getBuffer().capacity() + ")");
}
}
}
@@ -137,7 +141,7 @@ public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
}
}
- private void groupInit() {
+ private void groupInit() throws HyracksDataException {
groupStarted = true;
bPtr = 0;
fta.reset(group.get(0), true);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
index 295fe1f..6708e17 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -22,8 +22,11 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.NoShrinkVSizeFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -34,26 +37,29 @@ import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
public class ShuffleFrameReader implements IFrameReader {
private final IHyracksTaskContext ctx;
private final NonDeterministicChannelReader channelReader;
private final HadoopHelper helper;
private final RecordDescriptor recordDescriptor;
+ private final IFrame vframe;
private List<RunFileWriter> runFileWriters;
+ private List<Integer> runFileMaxFrameSize;
private RunFileReader reader;
public ShuffleFrameReader(IHyracksTaskContext ctx, NonDeterministicChannelReader channelReader,
MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
this.ctx = ctx;
this.channelReader = channelReader;
- helper = new HadoopHelper(mConfig);
+ this.helper = new HadoopHelper(mConfig);
this.recordDescriptor = helper.getMapOutputRecordDescriptor();
+ this.vframe = new NoShrinkVSizeFrame(ctx);
}
@Override
@@ -61,21 +67,28 @@ public class ShuffleFrameReader implements IFrameReader {
channelReader.open();
int nSenders = channelReader.getSenderPartitionCount();
runFileWriters = new ArrayList<RunFileWriter>();
+ runFileMaxFrameSize = new ArrayList<>();
RunInfo[] infos = new RunInfo[nSenders];
- FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- IInputChannel[] channels = channelReader.getChannels();
+ FrameTupleAccessor accessor = new FrameTupleAccessor(recordDescriptor);
while (true) {
int entry = channelReader.findNextSender();
if (entry < 0) {
break;
}
RunInfo info = infos[entry];
- IInputChannel channel = channels[entry];
- ByteBuffer netBuffer = channel.getNextBuffer();
- accessor.reset(netBuffer);
+ ByteBuffer netBuffer = channelReader.getNextBuffer(entry);
+ netBuffer.clear();
+ int nBlocks = FrameHelper.deserializeNumOfMinFrame(netBuffer);
+
+ if (nBlocks > 1) {
+ netBuffer = getCompleteBuffer(nBlocks, netBuffer, entry);
+ }
+
+ accessor.reset(netBuffer, 0, netBuffer.limit());
int nTuples = accessor.getTupleCount();
for (int i = 0; i < nTuples; ++i) {
- int tBlockId = IntegerPointable.getInteger(accessor.getBuffer().array(), FrameUtils.getAbsoluteFieldStartOffset(accessor, i, HadoopHelper.BLOCKID_FIELD_INDEX));
+ int tBlockId = IntegerPointable.getInteger(accessor.getBuffer().array(),
+ accessor.getAbsoluteFieldStartOffset(i, HadoopHelper.BLOCKID_FIELD_INDEX));
if (info == null) {
info = new RunInfo();
info.reset(tBlockId);
@@ -86,7 +99,10 @@ public class ShuffleFrameReader implements IFrameReader {
}
info.write(accessor, i);
}
- channel.recycleBuffer(netBuffer);
+
+ if (nBlocks == 1) {
+ channelReader.recycleBuffer(entry, netBuffer);
+ }
}
for (int i = 0; i < infos.length; ++i) {
RunInfo info = infos[i];
@@ -94,7 +110,6 @@ public class ShuffleFrameReader implements IFrameReader {
info.close();
}
}
- infos = null;
FileReference outFile = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
int framesLimit = helper.getSortFrameLimit(ctx);
@@ -103,22 +118,40 @@ public class ShuffleFrameReader implements IFrameReader {
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- List<IFrameReader> runs = new LinkedList<IFrameReader>();
- for (RunFileWriter rfw : runFileWriters) {
- runs.add(rfw.createReader());
+ List<RunAndMaxFrameSizePair> runs = new LinkedList<>();
+ for (int i = 0; i < runFileWriters.size(); i++) {
+ runs.add(new RunAndMaxFrameSizePair(runFileWriters.get(i).createReader(), runFileMaxFrameSize.get(i)));
}
RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
- ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators, null,
- recordDescriptor, framesLimit, rfw);
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 },
+ comparators, null, recordDescriptor, framesLimit, rfw);
merger.process();
reader = rfw.createReader();
reader.open();
}
+ private ByteBuffer getCompleteBuffer(int nBlocks, ByteBuffer netBuffer, int entry) throws HyracksDataException {
+ vframe.reset();
+ vframe.ensureFrameSize(vframe.getMinSize() * nBlocks);
+ FrameUtils.copyWholeFrame(netBuffer, vframe.getBuffer());
+ channelReader.recycleBuffer(entry, netBuffer);
+ for (int i = 1; i < nBlocks; ++i) {
+ netBuffer = channelReader.getNextBuffer(entry);
+ netBuffer.clear();
+ vframe.getBuffer().put(netBuffer);
+ channelReader.recycleBuffer(entry, netBuffer);
+ }
+ if (vframe.getBuffer().hasRemaining()) { // bigger frame
+ FrameHelper.clearRemainingFrame(vframe.getBuffer(), vframe.getBuffer().position());
+ }
+ vframe.getBuffer().flip();
+ return vframe.getBuffer();
+ }
+
@Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- return reader.nextFrame(buffer);
+ public boolean nextFrame(IFrame frame) throws HyracksDataException {
+ return reader.nextFrame(frame);
}
@Override
@@ -127,20 +160,22 @@ public class ShuffleFrameReader implements IFrameReader {
}
private class RunInfo {
- private final ByteBuffer buffer;
+ private final IFrame buffer;
private final FrameTupleAppender fta;
private FileReference file;
private RunFileWriter rfw;
private int blockId;
+ private int maxFrameSize = ctx.getInitialFrameSize();
public RunInfo() throws HyracksDataException {
- buffer = ctx.allocateFrame();
- fta = new FrameTupleAppender(ctx.getFrameSize());
+ buffer = new VSizeFrame(ctx);
+ fta = new FrameTupleAppender();
}
public void reset(int blockId) throws HyracksDataException {
this.blockId = blockId;
+ this.maxFrameSize = ctx.getInitialFrameSize();
fta.reset(buffer, true);
try {
file = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
@@ -165,15 +200,15 @@ public class ShuffleFrameReader implements IFrameReader {
flush();
rfw.close();
runFileWriters.add(rfw);
+ runFileMaxFrameSize.add(maxFrameSize);
}
private void flush() throws HyracksDataException {
if (fta.getTupleCount() <= 0) {
return;
}
- buffer.limit(buffer.capacity());
- buffer.position(0);
- rfw.nextFrame(buffer);
+ maxFrameSize = buffer.getFrameSize() > maxFrameSize ? buffer.getFrameSize() : maxFrameSize;
+ rfw.nextFrame((ByteBuffer) buffer.getBuffer().clear());
fta.reset(buffer, true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/pom.xml b/hyracks/hyracks-dataflow-std/pom.xml
index c4b23fd..5eadbca 100644
--- a/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks/hyracks-dataflow-std/pom.xml
@@ -66,5 +66,11 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.16-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index 607a817..12ff8e8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -18,6 +18,8 @@ import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -42,27 +44,58 @@ public class InputChannelFrameReader implements IFrameReader, IInputChannelMonit
public void open() throws HyracksDataException {
}
- @Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- synchronized (this) {
- while (!failed && !eos && availableFrames <= 0) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- if (failed) {
- throw new HyracksDataException("Failure occurred on input");
+ private synchronized boolean canGetNextBuffer() throws HyracksDataException {
+ while (!failed && !eos && availableFrames <= 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
}
- if (availableFrames <= 0 && eos) {
- return false;
+ }
+ if (failed) {
+ throw new HyracksDataException("Failure occurred on input");
+ }
+ if (availableFrames <= 0 && eos) {
+ return false;
+ }
+ --availableFrames;
+ return true;
+ }
+
+ /**
+ * This implementation works under the truth that one Channel is never shared by two readers.
+ * More precisely, one channel only has exact one reader and one writer side.
+ *
+ * @param frame outputFrame
+ * @return {@code true} if succeed to read the data from the channel to the {@code frame}.
+ * Otherwise return {@code false} if the end of stream is reached.
+ * @throws HyracksDataException
+ */
+ @Override
+ public boolean nextFrame(IFrame frame) throws HyracksDataException {
+ if (!canGetNextBuffer()) {
+ return false;
+ }
+ frame.reset();
+ ByteBuffer srcFrame = channel.getNextBuffer();
+ int nBlocks = FrameHelper.deserializeNumOfMinFrame(srcFrame);
+ frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+ FrameUtils.copyWholeFrame(srcFrame, frame.getBuffer());
+ channel.recycleBuffer(srcFrame);
+
+ for (int i = 1; i < nBlocks; ++i) {
+ if (!canGetNextBuffer()) {
+ throw new HyracksDataException(
+ "InputChannelReader is waiting for the new frames, but the input stream is finished");
}
- --availableFrames;
+ srcFrame = channel.getNextBuffer();
+ frame.getBuffer().put(srcFrame);
+ channel.recycleBuffer(srcFrame);
+ }
+ if (frame.getBuffer().hasRemaining()) { // bigger frame
+ FrameHelper.clearRemainingFrame(frame.getBuffer(), frame.getBuffer().position());
}
- ByteBuffer srcBuffer = channel.getNextBuffer();
- FrameUtils.copy(srcBuffer, buffer);
- channel.recycleBuffer(srcBuffer);
+ frame.getBuffer().flip();
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
index 7f447c6..0c25d54 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.dataflow.std.collectors;
+import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -67,12 +68,27 @@ public class NonDeterministicChannelReader implements IInputChannelMonitor, IPar
return nSenderPartitions;
}
- public void open() throws HyracksDataException {
- lastReadSender = -1;
+ public synchronized ByteBuffer getNextBuffer(int index) throws HyracksDataException {
+ while ((availableFrameCounts[index] <= 0)) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if (--availableFrameCounts[index] == 0) {
+ frameAvailability.clear(index);
+ }
+ return channels[index].getNextBuffer();
+
}
- public IInputChannel[] getChannels() {
- return channels;
+ public void recycleBuffer(int index, ByteBuffer frame) {
+ channels[index].recycleBuffer(frame);
+ }
+
+ public void open() throws HyracksDataException {
+ lastReadSender = -1;
}
public synchronized int findNextSender() throws HyracksDataException {
@@ -83,9 +99,6 @@ public class NonDeterministicChannelReader implements IInputChannelMonitor, IPar
}
if (lastReadSender >= 0) {
assert availableFrameCounts[lastReadSender] > 0;
- if (--availableFrameCounts[lastReadSender] == 0) {
- frameAvailability.clear(lastReadSender);
- }
return lastReadSender;
}
if (!failSenders.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
index e107cfa..6dd6972 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
@@ -16,7 +16,8 @@ package edu.uci.ics.hyracks.dataflow.std.collectors;
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -34,16 +35,27 @@ public class NonDeterministicFrameReader implements IFrameReader {
}
@Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ public boolean nextFrame(IFrame frame) throws HyracksDataException {
int index = channelReader.findNextSender();
- if (index >= 0) {
- IInputChannel[] channels = channelReader.getChannels();
- ByteBuffer srcFrame = channels[index].getNextBuffer();
- FrameUtils.copy(srcFrame, buffer);
- channels[index].recycleBuffer(srcFrame);
- return true;
+ if (index < 0) {
+ return false;
}
- return false;
+ frame.reset();
+ ByteBuffer srcFrame = channelReader.getNextBuffer(index);
+ int nBlocks = FrameHelper.deserializeNumOfMinFrame(srcFrame);
+ frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+ FrameUtils.copyWholeFrame(srcFrame, frame.getBuffer());
+ channelReader.recycleBuffer(index, srcFrame);
+ for (int i = 1; i < nBlocks; ++i) {
+ srcFrame = channelReader.getNextBuffer(index);
+ frame.getBuffer().put(srcFrame);
+ channelReader.recycleBuffer(index, srcFrame);
+ }
+ if (frame.getBuffer().hasRemaining()) { // bigger frame
+ FrameHelper.clearRemainingFrame(frame.getBuffer(), frame.getBuffer().position());
+ }
+ frame.getBuffer().flip();
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
index 2dda9cc..125d07a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
@@ -14,11 +14,12 @@
*/
package edu.uci.ics.hyracks.dataflow.std.collectors;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
@@ -54,13 +55,13 @@ public class SortMergeFrameReader implements IFrameReader {
@Override
public void open() throws HyracksDataException {
if (maxConcurrentMerges >= nSenders) {
- List<ByteBuffer> inFrames = new ArrayList<ByteBuffer>();
+ List<IFrame> inFrames = new ArrayList<>(nSenders);
for (int i = 0; i < nSenders; ++i) {
- inFrames.add(ByteBuffer.allocate(ctx.getFrameSize()));
+ inFrames.add(new VSizeFrame(ctx));
}
- List<IFrameReader> batch = new ArrayList<IFrameReader>();
+ List<IFrameReader> batch = new ArrayList<IFrameReader>(nSenders);
pbm.getNextBatch(batch, nSenders);
- merger = new RunMergingFrameReader(ctx, batch.toArray(new IFrameReader[nSenders]), inFrames, sortFields,
+ merger = new RunMergingFrameReader(ctx, batch, inFrames, sortFields,
comparators, nmkComputer, recordDescriptor);
} else {
// multi level merge.
@@ -70,10 +71,8 @@ public class SortMergeFrameReader implements IFrameReader {
}
@Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- buffer.position(buffer.capacity());
- buffer.limit(buffer.capacity());
- return merger.nextFrame(buffer);
+ public boolean nextFrame(IFrame frame) throws HyracksDataException {
+ return merger.nextFrame(frame);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
index dcca28e..edf4cc9 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
@@ -17,19 +17,22 @@ package edu.uci.ics.hyracks.dataflow.std.connectors;
import java.io.IOException;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class LocalityAwarePartitionDataWriter implements IFrameWriter {
private final IFrameWriter[] pWriters;
- private final FrameTupleAppender[] appenders;
+ private final IFrameTupleAppender[] appenders;
private final FrameTupleAccessor tupleAccessor;
private final ITuplePartitionComputer tpc;
@@ -38,17 +41,17 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
ILocalityMap localityMap, int senderIndex) throws HyracksDataException {
int[] consumerPartitions = localityMap.getConsumers(senderIndex, nConsumerPartitions);
pWriters = new IFrameWriter[consumerPartitions.length];
- appenders = new FrameTupleAppender[consumerPartitions.length];
+ appenders = new IFrameTupleAppender[consumerPartitions.length];
for (int i = 0; i < consumerPartitions.length; ++i) {
try {
pWriters[i] = pwFactory.createFrameWriter(consumerPartitions[i]);
- appenders[i] = new FrameTupleAppender(ctx.getFrameSize());
- appenders[i].reset(ctx.allocateFrame(), true);
+ appenders[i] = new FrameTupleAppender();
+ appenders[i].reset(new VSizeFrame(ctx), true);
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
- tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ tupleAccessor = new FrameTupleAccessor(recordDescriptor);
this.tpc = tpc;
}
@@ -61,7 +64,6 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
public void open() throws HyracksDataException {
for (int i = 0; i < pWriters.length; ++i) {
pWriters[i].open();
- appenders[i].reset(appenders[i].getBuffer(), true);
}
}
@@ -77,15 +79,7 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
int tupleCount = tupleAccessor.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
int h = pWriters.length == 1 ? 0 : tpc.partition(tupleAccessor, i, pWriters.length);
- FrameTupleAppender appender = appenders[h];
- if (!appender.append(tupleAccessor, i)) {
- ByteBuffer appenderBuffer = appender.getBuffer();
- flushFrame(appenderBuffer, pWriters[h]);
- appender.reset(appenderBuffer, true);
- if (!appender.append(tupleAccessor, i)) {
- throw new HyracksDataException("Record size (" + (tupleAccessor.getTupleEndOffset(i) - tupleAccessor.getTupleStartOffset(i)) + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
- }
- }
+ FrameUtils.appendToWriter(pWriters[h], appenders[h], tupleAccessor, i);
}
}
@@ -101,12 +95,6 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
}
}
- private void flushFrame(ByteBuffer buffer, IFrameWriter frameWriter) throws HyracksDataException {
- buffer.position(0);
- buffer.limit(buffer.capacity());
- frameWriter.nextFrame(buffer);
- }
-
/*
* (non-Javadoc)
*
@@ -115,9 +103,7 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
@Override
public void close() throws HyracksDataException {
for (int i = 0; i < pWriters.length; ++i) {
- if (appenders[i].getTupleCount() > 0) {
- flushFrame(appenders[i].getBuffer(), pWriters[i]);
- }
+ appenders[i].flush(pWriters[i], true);
pWriters[i].close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index ea586fc..74f16d1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -19,12 +19,14 @@ import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class PartitionDataWriter implements IFrameWriter {
private final int consumerPartitionCount;
@@ -33,7 +35,7 @@ public class PartitionDataWriter implements IFrameWriter {
private final FrameTupleAccessor tupleAccessor;
private final ITuplePartitionComputer tpc;
private final IHyracksTaskContext ctx;
- private boolean allocated = false;
+ private boolean allocatedFrame = false;
public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory pwFactory,
RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
@@ -43,12 +45,12 @@ public class PartitionDataWriter implements IFrameWriter {
for (int i = 0; i < consumerPartitionCount; ++i) {
try {
pWriters[i] = pwFactory.createFrameWriter(i);
- appenders[i] = new FrameTupleAppender(ctx.getFrameSize());
+ appenders[i] = new FrameTupleAppender();
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
- tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ tupleAccessor = new FrameTupleAccessor(recordDescriptor);
this.tpc = tpc;
this.ctx = ctx;
}
@@ -56,21 +58,13 @@ public class PartitionDataWriter implements IFrameWriter {
@Override
public void close() throws HyracksDataException {
for (int i = 0; i < pWriters.length; ++i) {
- if (allocated) {
- if (appenders[i].getTupleCount() > 0) {
- flushFrame(appenders[i].getBuffer(), pWriters[i]);
- }
+ if (allocatedFrame) {
+ appenders[i].flush(pWriters[i], true);
}
pWriters[i].close();
}
}
- private void flushFrame(ByteBuffer buffer, IFrameWriter frameWriter) throws HyracksDataException {
- buffer.position(0);
- buffer.limit(buffer.capacity());
- frameWriter.nextFrame(buffer);
- }
-
@Override
public void open() throws HyracksDataException {
for (int i = 0; i < pWriters.length; ++i) {
@@ -80,34 +74,22 @@ public class PartitionDataWriter implements IFrameWriter {
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- if (!allocated) {
+ if (!allocatedFrame) {
allocateFrames();
- allocated = true;
+ allocatedFrame = true;
}
tupleAccessor.reset(buffer);
int tupleCount = tupleAccessor.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
- FrameTupleAppender appender = appenders[h];
- if (!appender.append(tupleAccessor, i)) {
- ByteBuffer appenderBuffer = appender.getBuffer();
- flushFrame(appenderBuffer, pWriters[h]);
- appender.reset(appenderBuffer, true);
- if (!appender.append(tupleAccessor, i)) {
- throw new HyracksDataException("Record size ("
- + (tupleAccessor.getTupleEndOffset(i) - tupleAccessor.getTupleStartOffset(i))
- + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
- }
- }
+ FrameUtils.appendToWriter(pWriters[h], appenders[h], tupleAccessor, i);
+
}
}
- /**
- * @throws HyracksDataException
- */
private void allocateFrames() throws HyracksDataException {
for (int i = 0; i < appenders.length; ++i) {
- appenders[i].reset(ctx.allocateFrame(), true);
+ appenders[i].reset(new VSizeFrame(ctx), true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 5be1eab..765e223 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -18,11 +18,10 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.Reader;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -41,7 +40,8 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
this(fieldParserFactories, fieldDelimiter, '\"');
}
- public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter, char quote) {
+ public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter,
+ char quote) {
this.valueParserFactories = fieldParserFactories;
this.fieldDelimiter = fieldDelimiter;
this.quote = quote;
@@ -57,8 +57,8 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
for (int i = 0; i < valueParserFactories.length; ++i) {
valueParsers[i] = valueParserFactories[i].createValueParser();
}
- ByteBuffer frame = ctx.allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ IFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender appender = new FrameTupleAppender();
appender.reset(frame, true);
ArrayTupleBuilder tb = new ArrayTupleBuilder(valueParsers.length);
DataOutput dos = tb.getDataOutput();
@@ -80,18 +80,10 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
tb.addFieldEndOffset();
}
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("Record size (" + tb.getSize()
- + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
- }
- }
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame, writer);
+ FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+ tb.getSize());
}
+ appender.flush(writer, true);
} catch (IOException e) {
throw new HyracksDataException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 99f5a5f..7b2e8a0 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -37,7 +37,7 @@ import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodeP
public class PlainFileWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
/**
- *
+ *
*/
private static final long serialVersionUID = 1L;
@@ -45,11 +45,6 @@ public class PlainFileWriterOperatorDescriptor extends AbstractSingleActivityOpe
private String delim;
- /**
- * @param spec
- * @param inputArity
- * @param outputArity
- */
public PlainFileWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvider,
String delim) {
super(spec, 1, 0);
@@ -74,7 +69,7 @@ public class PlainFileWriterOperatorDescriptor extends AbstractSingleActivityOpe
// Output files
final FileSplit[] splits = fileSplitProvider.getFileSplits();
// Frame accessor
- final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
// Record descriptor
final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index a4970ea..4d62fa0 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -14,11 +14,12 @@
*/
package edu.uci.ics.hyracks.dataflow.std.group;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -33,7 +34,6 @@ import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
@@ -76,10 +76,8 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
}
RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
- final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(),
- internalRecordDescriptor);
- final FrameTupleAccessor storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(),
- internalRecordDescriptor);
+ final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(internalRecordDescriptor);
+ final FrameTupleAccessor storedKeysAccessor2 = new FrameTupleAccessor(internalRecordDescriptor);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
@@ -118,14 +116,14 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
private int lastBufIndex;
- private ByteBuffer outputFrame;
+ private IFrame outputFrame;
private FrameTupleAppender outputAppender;
- private FrameTupleAppender stateAppender = new FrameTupleAppender(ctx.getFrameSize());
+ private FrameTupleAppender stateAppender = new FrameTupleAppender();
private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);
private final TuplePointer storedTuplePointer = new TuplePointer();
- private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
+ private final List<IFrame> frames = new ArrayList<>();
/**
* A tuple is "pointed" to by 3 entries in the tPointers array. [0]
@@ -153,7 +151,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
table.getTuplePointer(entry, offset, storedTuplePointer);
int fIndex = storedTuplePointer.frameIndex;
int tIndex = storedTuplePointer.tupleIndex;
- storedKeysAccessor1.reset(frames.get(fIndex));
+ storedKeysAccessor1.reset(frames.get(fIndex).getBuffer());
int tStart = storedKeysAccessor1.getTupleStartOffset(tIndex);
int f0StartRel = storedKeysAccessor1.getFieldStartOffset(tIndex, sfIdx);
int f0EndRel = storedKeysAccessor1.getFieldEndOffset(tIndex, sfIdx);
@@ -191,7 +189,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
table.getTuplePointer(entry, offset++, storedTuplePointer);
if (storedTuplePointer.frameIndex < 0)
break;
- storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex));
+ storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex).getBuffer());
int c = ftpcPartial.compare(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex);
if (c == 0) {
foundGroup = true;
@@ -232,7 +230,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
}
@Override
- public List<ByteBuffer> getFrames() {
+ public List<IFrame> getFrames() {
return frames;
}
@@ -244,11 +242,11 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
@Override
public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
if (outputFrame == null) {
- outputFrame = ctx.allocateFrame();
+ outputFrame = new VSizeFrame(ctx);
}
if (outputAppender == null) {
- outputAppender = new FrameTupleAppender(outputFrame.capacity());
+ outputAppender = new FrameTupleAppender();
}
outputAppender.reset(outputFrame, true);
@@ -265,7 +263,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
int bIndex = storedTuplePointer.frameIndex;
int tIndex = storedTuplePointer.tupleIndex;
- storedKeysAccessor1.reset(frames.get(bIndex));
+ storedKeysAccessor1.reset(frames.get(bIndex).getBuffer());
outputTupleBuilder.reset();
for (int k = 0; k < storedKeys.length; k++) {
@@ -285,8 +283,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputFrame, writer);
- outputAppender.reset(outputFrame, true);
+ outputAppender.flush(writer, true);
if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
throw new HyracksDataException(
@@ -296,10 +293,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
} while (true);
}
- if (outputAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outputFrame, writer);
- outputAppender.reset(outputFrame, true);
- }
+ outputAppender.flush(writer, true);
aggregator.close();
return;
}
@@ -311,8 +305,8 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
int frameIndex = storedTuplePointer.frameIndex;
int tupleIndex = storedTuplePointer.tupleIndex;
// Get the frame containing the value
- ByteBuffer buffer = frames.get(frameIndex);
- storedKeysAccessor1.reset(buffer);
+ IFrame buffer = frames.get(frameIndex);
+ storedKeysAccessor1.reset(buffer.getBuffer());
outputTupleBuilder.reset();
for (int k = 0; k < storedKeys.length; k++) {
@@ -332,18 +326,14 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputFrame, writer);
- outputAppender.reset(outputFrame, true);
+ outputAppender.flush(writer, true);
if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
throw new HyracksDataException("The output item is too large to be fit into a frame.");
}
}
}
- if (outputAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outputFrame, writer);
- outputAppender.reset(outputFrame, true);
- }
+ outputAppender.flush(writer, true);
aggregator.close();
}
@@ -372,19 +362,14 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
if (frames.size() < framesLimit) {
// Insert a new frame
- ByteBuffer frame = ctx.allocateFrame();
- frame.position(0);
- frame.limit(frame.capacity());
+ IFrame frame = new VSizeFrame(ctx);
frames.add(frame);
stateAppender.reset(frame, true);
lastBufIndex = frames.size() - 1;
} else {
// Reuse an old frame
lastBufIndex++;
- ByteBuffer frame = frames.get(lastBufIndex);
- frame.position(0);
- frame.limit(frame.capacity());
- stateAppender.reset(frame, true);
+ stateAppender.reset(frames.get(lastBufIndex), true);
}
return true;
}
@@ -398,7 +383,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
table.getTuplePointer(mTable, mRow, storedTuplePointer);
int mFrame = storedTuplePointer.frameIndex;
int mTuple = storedTuplePointer.tupleIndex;
- storedKeysAccessor1.reset(frames.get(mFrame));
+ storedKeysAccessor1.reset(frames.get(mFrame).getBuffer());
int a = offset;
int b = a;
@@ -416,7 +401,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
table.getTuplePointer(bTable, bRow, storedTuplePointer);
int bFrame = storedTuplePointer.frameIndex;
int bTuple = storedTuplePointer.tupleIndex;
- storedKeysAccessor2.reset(frames.get(bFrame));
+ storedKeysAccessor2.reset(frames.get(bFrame).getBuffer());
cmp = ftpcTuple.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
}
if (cmp > 0) {
@@ -438,7 +423,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
table.getTuplePointer(cTable, cRow, storedTuplePointer);
int cFrame = storedTuplePointer.frameIndex;
int cTuple = storedTuplePointer.tupleIndex;
- storedKeysAccessor2.reset(frames.get(cFrame));
+ storedKeysAccessor2.reset(frames.get(cFrame).getBuffer());
cmp = ftpcTuple.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
}
if (cmp < 0) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
index 6ac2a6d..b0c7e4d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
@@ -14,9 +14,9 @@
*/
package edu.uci.ics.hyracks.dataflow.std.group;
-import java.nio.ByteBuffer;
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -29,7 +29,7 @@ public interface ISpillableTable {
public int getFrameCount();
- public List<ByteBuffer> getFrames();
+ public List<IFrame> getFrames();
public void sortFrames() throws HyracksDataException;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index 8e9e8b8..e683e47 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -61,7 +61,7 @@ class ExternalGroupBuildOperatorNodePushable extends AbstractUnaryInputSinkOpera
this.spillableTableFactory = spillableTableFactory;
this.inRecordDescriptor = inRecordDescriptor;
this.outRecordDescriptor = outRecordDescriptor;
- this.accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+ this.accessor = new FrameTupleAccessor(inRecordDescriptor);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index a55443c..9e3d4fc 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -14,14 +14,15 @@
*/
package edu.uci.ics.hyracks.dataflow.std.group.external;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -33,7 +34,7 @@ import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppenderAccessor;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -60,12 +61,12 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
/**
* Input frames, one for each run file.
*/
- private List<ByteBuffer> inFrames;
+ private List<IFrame> inFrames;
/**
* Output frame.
*/
- private ByteBuffer outFrame, writerFrame;
- private final FrameTupleAppender outAppender;
+ private IFrame outFrame, writerFrame;
+ private final FrameTupleAppenderAccessor outAppender;
private FrameTupleAppender writerAppender;
private LinkedList<RunFileReader> runs;
private ExternalGroupState aggState;
@@ -76,7 +77,6 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
private int runFrameLimit = 1;
private int[] currentFrameIndexInRun;
private int[] currentRunFrames;
- private final FrameTupleAccessor outFrameAccessor;
ExternalGroupMergeOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nmkFactory, int[] keyFields,
@@ -108,8 +108,7 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
tupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
this.ctx = ctx;
- outAppender = new FrameTupleAppender(ctx.getFrameSize());
- outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
+ outAppender = new FrameTupleAppenderAccessor(outRecordDescriptor);
this.isOutputSorted = isOutputSorted;
this.framesLimit = framesLimit;
this.outRecordDescriptor = outRecordDescriptor;
@@ -132,10 +131,9 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
} else {
aggState = null;
runs = new LinkedList<RunFileReader>(runs);
- inFrames = new ArrayList<ByteBuffer>();
- outFrame = ctx.allocateFrame();
+ inFrames = new ArrayList<>();
+ outFrame = new VSizeFrame(ctx);
outAppender.reset(outFrame, true);
- outFrameAccessor.reset(outFrame);
while (runs.size() > 0) {
try {
doPass(runs);
@@ -160,7 +158,7 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
boolean finalPass = false;
while (inFrames.size() + 2 < framesLimit) {
- inFrames.add(ctx.allocateFrame());
+ inFrames.add(new VSizeFrame(ctx));
}
int runNumber;
if (runs.size() + 2 <= framesLimit) {
@@ -184,8 +182,8 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
RunFileReader[] runFileReaders = new RunFileReader[runNumber];
FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), outRecordDescriptor,
- runNumber, comparator, keyFields, nmkComputer);
+ ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(runNumber, comparator, keyFields,
+ nmkComputer);
/**
* current tuple index in each run
*/
@@ -203,8 +201,8 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
for (int j = 0; j < runFrameLimit; j++) {
int frameIndex = currentFrameIndexInRun[runIndex] + j;
if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
- tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
- tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+ tupleAccessors[frameIndex] = new FrameTupleAccessor(outRecordDescriptor);
+ tupleAccessors[frameIndex].reset(inFrames.get(frameIndex).getBuffer());
currentRunFrames[runIndex]++;
if (j == 0)
setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
@@ -224,11 +222,11 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
ReferenceEntry top = topTuples.peek();
int tupleIndex = top.getTupleIndex();
int runIndex = topTuples.peek().getRunid();
- FrameTupleAccessor fta = top.getAccessor();
+ IFrameTupleAccessor fta = top.getAccessor();
- int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+ int currentTupleInOutFrame = outAppender.getTupleCount() - 1;
if (currentTupleInOutFrame < 0
- || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+ || compareFrameTuples(fta, tupleIndex, outAppender, currentTupleInOutFrame) != 0) {
/**
* Initialize the first output record Reset the
* tuple builder
@@ -259,7 +257,7 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
* outFrame
*/
- aggregator.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame, aggregateState);
+ aggregator.aggregate(fta, tupleIndex, outAppender, currentTupleInOutFrame, aggregateState);
}
tupleIndices[runIndex]++;
@@ -295,49 +293,42 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
}
if (writerFrame == null) {
- writerFrame = ctx.allocateFrame();
+ writerFrame = new VSizeFrame(ctx);
}
if (writerAppender == null) {
- writerAppender = new FrameTupleAppender(ctx.getFrameSize());
+ writerAppender = new FrameTupleAppender();
writerAppender.reset(writerFrame, true);
}
- outFrameAccessor.reset(outFrame);
-
- for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+ for (int i = 0; i < outAppender.getTupleCount(); i++) {
finalTupleBuilder.reset();
for (int k = 0; k < storedKeys.length; k++) {
- finalTupleBuilder.addField(outFrameAccessor, i, storedKeys[k]);
+ finalTupleBuilder.addField(outAppender, i, storedKeys[k]);
}
if (isFinal) {
- aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+ aggregator.outputFinalResult(finalTupleBuilder, outAppender, i, aggregateState);
} else {
- aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+ aggregator.outputPartialResult(finalTupleBuilder, outAppender, i, aggregateState);
}
if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
- FrameUtils.flushFrame(writerFrame, writer);
- writerAppender.reset(writerFrame, true);
+ writerAppender.flush(writer, true);
if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
}
}
}
- if (writerAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writerFrame, writer);
- writerAppender.reset(writerFrame, true);
- }
+ writerAppender.flush(writer, true);
- outAppender.reset(outFrame, true);
}
private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
@@ -377,7 +368,7 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
for (int j = 0; j < runFrameLimit; j++) {
int frameIndex = currentFrameIndexInRun[runIndex] + j;
if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
- tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+ tupleAccessors[frameIndex].reset(inFrames.get(frameIndex).getBuffer());
existNext = true;
currentRunFrames[runIndex]++;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
index 3c0eb2b..0102e65 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
@@ -19,7 +19,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -32,6 +34,7 @@ import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -40,7 +43,7 @@ class GroupingHashTable {
/**
* The pointers in the link store 3 int values for each entry in the
* hashtable: (bufferIdx, tIndex, accumulatorIdx).
- *
+ *
* @author vinayakb
*/
private static class Link {
@@ -67,7 +70,7 @@ class GroupingHashTable {
private static final int INIT_AGG_STATE_SIZE = 8;
private final IHyracksTaskContext ctx;
- private final List<ByteBuffer> buffers;
+ private final List<IFrame> buffers;
private final Link[] table;
/**
* Aggregate states: a list of states for all groups maintained in the main
@@ -84,6 +87,7 @@ class GroupingHashTable {
private final ITuplePartitionComputer tpc;
private final IAggregatorDescriptor aggregator;
+ private final IFrame outputFrame;
private final FrameTupleAppender appender;
private final FrameTupleAccessor storedKeysAccessor;
@@ -96,7 +100,7 @@ class GroupingHashTable {
throws HyracksDataException {
this.ctx = ctx;
- buffers = new ArrayList<ByteBuffer>();
+ buffers = new ArrayList<>();
table = new Link[tableSize];
keys = fields;
@@ -127,10 +131,10 @@ class GroupingHashTable {
accumulatorSize = 0;
RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
- storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
+ storedKeysAccessor = new FrameTupleAccessor(storedKeysRecordDescriptor);
lastBIndex = -1;
- appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender = new FrameTupleAppender();
addNewBuffer();
@@ -140,14 +144,13 @@ class GroupingHashTable {
stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
}
outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+ outputFrame = new VSizeFrame(ctx);
}
private void addNewBuffer() throws HyracksDataException {
- ByteBuffer buffer = ctx.allocateFrame();
- buffer.position(0);
- buffer.limit(buffer.capacity());
- buffers.add(buffer);
- appender.reset(buffer, true);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ buffers.add(frame);
+ appender.reset(frame, true);
++lastBIndex;
}
@@ -161,7 +164,7 @@ class GroupingHashTable {
for (int i = 0; i < link.size; i += 3) {
int sbIndex = link.pointers[i];
int stIndex = link.pointers[i + 1];
- storedKeysAccessor.reset(buffers.get(sbIndex));
+ storedKeysAccessor.reset(buffers.get(sbIndex).getBuffer());
int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
if (c == 0) {
saIndex = link.pointers[i + 2];
@@ -206,8 +209,7 @@ class GroupingHashTable {
}
void write(IFrameWriter writer) throws HyracksDataException {
- ByteBuffer buffer = ctx.allocateFrame();
- appender.reset(buffer, true);
+ appender.reset(outputFrame, true);
for (int i = 0; i < table.length; ++i) {
Link link = table[i];
@@ -216,7 +218,7 @@ class GroupingHashTable {
int bIndex = link.pointers[j];
int tIndex = link.pointers[j + 1];
int aIndex = link.pointers[j + 2];
- ByteBuffer keyBuffer = buffers.get(bIndex);
+ ByteBuffer keyBuffer = buffers.get(bIndex).getBuffer();
storedKeysAccessor.reset(keyBuffer);
// copy keys
@@ -228,22 +230,13 @@ class GroupingHashTable {
aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor, tIndex,
aggregateStates[aIndex]);
- if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
- writer.nextFrame(buffer);
- appender.reset(buffer, true);
- if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
- throw new HyracksDataException("Cannot write the aggregation output into a frame.");
- }
- }
+ FrameUtils.appendSkipEmptyFieldToWriter(writer, appender, outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize());
}
}
}
- if (appender.getTupleCount() != 0) {
- writer.nextFrame(buffer);
- }
+ appender.flush(writer, true);
}
void close() throws HyracksDataException {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java
index 8e49a9a..998d882 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupBuildOperatorNodePushable.java
@@ -44,7 +44,7 @@ class HashGroupBuildOperatorNodePushable extends AbstractUnaryInputSinkOperatorN
IAggregatorDescriptorFactory aggregatorFactory, int tableSize, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor) {
this.ctx = ctx;
- this.accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+ this.accessor = new FrameTupleAccessor(inRecordDescriptor);
this.stateId = stateId;
this.keys = keys;
this.tpcf = tpcf;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 9ce70c1..4dbf03b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -21,8 +21,6 @@ import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -52,12 +50,6 @@ class PreclusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutpu
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- final ByteBuffer copyFrame = ctx.allocateFrame();
- final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
- copyFrameAccessor.reset(copyFrame);
- ByteBuffer outFrame = ctx.allocateFrame();
- final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(outFrame, true);
pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDescriptor,
outRecordDescriptor, writer);
pgw.open();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 45f0488..559dec4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -16,15 +16,17 @@ package edu.uci.ics.hyracks.dataflow.std.group.preclustered;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
@@ -35,7 +37,7 @@ public class PreclusteredGroupWriter implements IFrameWriter {
private final IBinaryComparator[] comparators;
private final IAggregatorDescriptor aggregator;
private final AggregateState aggregateState;
- private final ByteBuffer copyFrame;
+ private final IFrame copyFrame;
private final FrameTupleAccessor inFrameAccessor;
private final FrameTupleAccessor copyFrameAccessor;
@@ -62,15 +64,15 @@ public class PreclusteredGroupWriter implements IFrameWriter {
this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields,
groupFields, writer);
this.aggregateState = aggregator.createAggregateStates();
- copyFrame = ctx.allocateFrame();
- inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
- copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
- copyFrameAccessor.reset(copyFrame);
+ copyFrame = new VSizeFrame(ctx);
+ inFrameAccessor = new FrameTupleAccessor(inRecordDesc);
+ copyFrameAccessor = new FrameTupleAccessor(inRecordDesc);
+ copyFrameAccessor.reset(copyFrame.getBuffer());
- ByteBuffer outFrame = ctx.allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ VSizeFrame outFrame = new VSizeFrame(ctx);
+ FrameTupleAppender appender = new FrameTupleAppender();
appender.reset(outFrame, true);
- appenderWrapper = new FrameTupleAppenderWrapper(appender, outFrame, writer);
+ appenderWrapper = new FrameTupleAppenderWrapper(appender, writer);
tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
}
@@ -105,7 +107,9 @@ public class PreclusteredGroupWriter implements IFrameWriter {
}
}
- FrameUtils.copy(buffer, copyFrame);
+ copyFrame.ensureFrameSize(buffer.capacity());
+ FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
+ copyFrameAccessor.reset(copyFrame.getBuffer());
}
private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
@@ -145,9 +149,9 @@ public class PreclusteredGroupWriter implements IFrameWriter {
throws HyracksDataException {
for (int i = 0; i < comparators.length; ++i) {
int fIdx = groupFields[i];
- int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+ int s1 = a1.getAbsoluteFieldStartOffset(t1Idx, fIdx);
int l1 = a1.getFieldLength(t1Idx, fIdx);
- int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+ int s2 = a2.getAbsoluteFieldStartOffset(t2Idx, fIdx);
int l2 = a2.getFieldLength(t2Idx, fIdx);
if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
return false;
@@ -165,6 +169,7 @@ public class PreclusteredGroupWriter implements IFrameWriter {
@Override
public void close() throws HyracksDataException {
if (!isFailed && !first) {
+ assert(copyFrameAccessor.getTupleCount() > 0);
writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
appenderWrapper.flush();
}