You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:19 UTC
[02/14] incubator-asterixdb-hyracks git commit:
VariableSizeFrame(VSizeFrame) support for Hyracks.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
index e46e685..790552c 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
+++ b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
@@ -19,10 +19,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -42,9 +42,7 @@ public class WordTupleParserFactory implements ITupleParserFactory {
@Override
public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
try {
- ByteBuffer frame = ctx.allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(frame, true);
+ FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
DataOutput dos = tb.getDataOutput();
@@ -54,17 +52,10 @@ public class WordTupleParserFactory implements ITupleParserFactory {
tb.reset();
utf8StringParser.parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
tb.addFieldEndOffset();
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
- }
- }
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame, writer);
+ FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+ tb.getSize());
}
+ appender.flush(writer, true);
} catch (IOException e) {
throw new HyracksDataException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index a437f37..e22f27f 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -56,7 +56,7 @@
<configuration>
<programs>
<program>
- <mainClass>edu.uci.ics.hyracks.examples.tpch.client.Main</mainClass>
+ <mainClass>edu.uci.ics.hyracks.examples.tpch.client.Sort</mainClass>
<name>tpchclient</name>
</program>
</programs>
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Common.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Common.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Common.java
new file mode 100644
index 0000000..17f1d3b
--- /dev/null
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Common.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.examples.tpch.client;
+
+import java.io.File;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+
+public class Common {
+ static RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ static RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+ static RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ static IValueParserFactory[] orderParserFactories = new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE };
+
+
+ static FileSplit[] parseFileSplits(String fileSplits) {
+ String[] splits = fileSplits.split(",");
+ FileSplit[] fSplits = new FileSplit[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ String s = splits[i].trim();
+ int idx = s.indexOf(':');
+ if (idx < 0) {
+ throw new IllegalArgumentException("File split " + s + " not well formed");
+ }
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ }
+ return fSplits;
+ }
+
+ static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
+ String[] parts = new String[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ parts[i] = splits[i].getNodeName();
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Join.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Join.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Join.java
new file mode 100644
index 0000000..748d809
--- /dev/null
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Join.java
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.tpch.client;
+
+import static edu.uci.ics.hyracks.examples.tpch.client.Common.*;
+
+import java.util.EnumSet;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+
+public class Join {
+ private static class Options {
+ @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+ public String host;
+
+ @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)", required = false)
+ public int port = 1098;
+
+ @Option(name = "-infile-customer-splits", usage = "Comma separated list of file-splits for the CUSTOMER input. A file-split is <node-name>:<path>", required = true)
+ public String inFileCustomerSplits;
+
+ @Option(name = "-infile-order-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true)
+ public String inFileOrderSplits;
+
+ @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+ public String outFileSplits;
+
+ @Option(name = "-num-join-partitions", usage = "Number of Join partitions to use (default: 1)", required = false)
+ public int numJoinPartitions = 1;
+
+ @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
+ public boolean profile = true;
+
+ @Option(name = "-table-size", usage = "Table size for in-memory hash join", required = false)
+ public int tableSize = 8191;
+
+ @Option(name = "-algo", usage = "Join types", required = true)
+ public String algo;
+
+ // For grace/hybrid hash join only
+ @Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
+ public int memSize;
+
+ @Option(name = "-input-size", usage = "Input size of the grace/hybrid hash join", required = false)
+ public int graceInputSize = 10;
+
+ @Option(name = "-records-per-frame", usage = "Records per frame for grace/hybrid hash join", required = false)
+ public int graceRecordsPerFrame = 200;
+
+ @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join", required = false)
+ public double graceFactor = 1.2;
+
+ // Whether group-by is processed after the join
+ @Option(name = "-has-groupby", usage = "Whether to have group-by operation after join (default: disabled)", required = false)
+ public boolean hasGroupBy = false;
+
+ @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+ public int frameSize = 32768;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+
+ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+ JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
+ parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
+ options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
+ options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy, options.frameSize);
+
+ long start = System.currentTimeMillis();
+ JobId jobId = hcc.startJob(job,
+ options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
+ }
+
+ private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
+ FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
+ double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize)
+ throws HyracksDataException {
+ JobSpecification spec = new JobSpecification(frameSize);
+
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
+
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), Common.ordersDesc);
+ createPartitionConstraint(spec, ordScanner, orderSplits);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), Common.custDesc);
+ createPartitionConstraint(spec, custScanner, customerSplits);
+
+ IOperatorDescriptor join;
+
+ if ("nestedloop".equalsIgnoreCase(algo)) {
+ join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), Common.custOrderJoinDesc,
+ memSize, false, null);
+
+ } else if ("gracehash".equalsIgnoreCase(algo)) {
+ join = new GraceHashJoinOperatorDescriptor(
+ spec,
+ memSize,
+ graceInputSize,
+ graceRecordsPerFrame,
+ graceFactor,
+ new int[] { 0 },
+ new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ Common.custOrderJoinDesc, null);
+
+ } else if ("hybridhash".equalsIgnoreCase(algo)) {
+ join = new HybridHashJoinOperatorDescriptor(
+ spec,
+ memSize,
+ graceInputSize,
+ graceRecordsPerFrame,
+ graceFactor,
+ new int[] { 0 },
+ new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ Common.custOrderJoinDesc, null);
+
+ } else {
+ join = new InMemoryHashJoinOperatorDescriptor(
+ spec,
+ new int[] { 0 },
+ new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ Common.custOrderJoinDesc, 6000000, null);
+ }
+
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
+
+ IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
+ spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+ IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
+ spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+ IOperatorDescriptor endingOp = join;
+
+ if (hasGroupBy) {
+
+ RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
+ spec,
+ new int[] { 6 },
+ new FieldHashPartitionComputerFactory(new int[] { 6 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+ groupResultDesc, 16);
+ createPartitionConstraint(spec, gby, resultSplits);
+
+ IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 6 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
+ spec.connect(joinGroupConn, join, 0, gby, 0);
+
+ endingOp = gby;
+ }
+
+ IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
+ FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ createPartitionConstraint(spec, writer, resultSplits);
+
+ IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(endingPrinterConn, endingOp, 0, writer, 0);
+
+ spec.addRoot(writer);
+ return spec;
+ }
+
+
+
+ static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final IBinaryComparatorFactory bFactory;
+ private final int pos0;
+ private final int pos1;
+
+ public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
+ this.bFactory = bFactory;
+ this.pos0 = pos0;
+ this.pos1 = pos1;
+ }
+
+ @Override
+ public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+ return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
+ }
+ }
+
+ static class JoinComparator implements ITuplePairComparator {
+
+ private final IBinaryComparator bComparator;
+ private final int field0;
+ private final int field1;
+
+ public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
+ this.bComparator = bComparator;
+ this.field0 = field0;
+ this.field1 = field1;
+ }
+
+ @Override
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+ throws HyracksDataException {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+ .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
deleted file mode 100644
index 1d4e6ce..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.examples.tpch.client;
-
-import java.io.File;
-import java.util.EnumSet;
-
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
-
-public class Main {
- private static class Options {
- @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
- public String host;
-
- @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)", required = false)
- public int port = 1098;
-
- @Option(name = "-infile-customer-splits", usage = "Comma separated list of file-splits for the CUSTOMER input. A file-split is <node-name>:<path>", required = true)
- public String inFileCustomerSplits;
-
- @Option(name = "-infile-order-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true)
- public String inFileOrderSplits;
-
- @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
- public String outFileSplits;
-
- @Option(name = "-num-join-partitions", usage = "Number of Join partitions to use (default: 1)", required = false)
- public int numJoinPartitions = 1;
-
- @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
- public boolean profile = true;
-
- @Option(name = "-table-size", usage = "Table size for in-memory hash join", required = false)
- public int tableSize = 8191;
-
- @Option(name = "-algo", usage = "Join types", required = true)
- public String algo;
-
- // For grace/hybrid hash join only
- @Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
- public int memSize;
-
- @Option(name = "-input-size", usage = "Input size of the grace/hybrid hash join", required = false)
- public int graceInputSize = 10;
-
- @Option(name = "-records-per-frame", usage = "Records per frame for grace/hybrid hash join", required = false)
- public int graceRecordsPerFrame = 200;
-
- @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join", required = false)
- public double graceFactor = 1.2;
-
- // Whether group-by is processed after the join
- @Option(name = "-has-groupby", usage = "Whether to have group-by operation after join (default: disabled)", required = false)
- public boolean hasGroupBy = false;
-
- @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
- public int frameSize = 32768;
- }
-
- public static void main(String[] args) throws Exception {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- parser.parseArgument(args);
-
- IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
-
- JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
- parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
- options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
- options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy, options.frameSize);
-
- long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(job,
- options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
- hcc.waitForCompletion(jobId);
- long end = System.currentTimeMillis();
- System.err.println(start + " " + end + " " + (end - start));
- }
-
- private static FileSplit[] parseFileSplits(String fileSplits) {
- String[] splits = fileSplits.split(",");
- FileSplit[] fSplits = new FileSplit[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- String s = splits[i].trim();
- int idx = s.indexOf(':');
- if (idx < 0) {
- throw new IllegalArgumentException("File split " + s + " not well formed");
- }
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
- }
- return fSplits;
- }
-
- private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
- FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
- double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize)
- throws HyracksDataException {
- JobSpecification spec = new JobSpecification(frameSize);
-
- IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
- IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
-
- FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- createPartitionConstraint(spec, ordScanner, orderSplits);
-
- FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- createPartitionConstraint(spec, custScanner, customerSplits);
-
- IOperatorDescriptor join;
-
- if ("nestedloop".equalsIgnoreCase(algo)) {
- join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), custOrderJoinDesc,
- memSize, false, null);
-
- } else if ("gracehash".equalsIgnoreCase(algo)) {
- join = new GraceHashJoinOperatorDescriptor(
- spec,
- memSize,
- graceInputSize,
- graceRecordsPerFrame,
- graceFactor,
- new int[] { 0 },
- new int[] { 1 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, null);
-
- } else if ("hybridhash".equalsIgnoreCase(algo)) {
- join = new HybridHashJoinOperatorDescriptor(
- spec,
- memSize,
- graceInputSize,
- graceRecordsPerFrame,
- graceFactor,
- new int[] { 0 },
- new int[] { 1 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, null);
-
- } else {
- join = new InMemoryHashJoinOperatorDescriptor(
- spec,
- new int[] { 0 },
- new int[] { 1 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, 6000000, null);
- }
-
- PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
-
- IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
- IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(custJoinConn, custScanner, 0, join, 0);
-
- IOperatorDescriptor endingOp = join;
-
- if (hasGroupBy) {
-
- RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-
- HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
- spec,
- new int[] { 6 },
- new FieldHashPartitionComputerFactory(new int[] { 6 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- groupResultDesc, 16);
- createPartitionConstraint(spec, gby, resultSplits);
-
- IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 6 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(joinGroupConn, join, 0, gby, 0);
-
- endingOp = gby;
- }
-
- IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
- FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
- createPartitionConstraint(spec, writer, resultSplits);
-
- IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(endingPrinterConn, endingOp, 0, writer, 0);
-
- spec.addRoot(writer);
- return spec;
- }
-
- private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
- String[] parts = new String[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- parts[i] = splits[i].getNodeName();
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
- }
-
- static class JoinComparatorFactory implements ITuplePairComparatorFactory {
- private static final long serialVersionUID = 1L;
-
- private final IBinaryComparatorFactory bFactory;
- private final int pos0;
- private final int pos1;
-
- public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
- this.bFactory = bFactory;
- this.pos0 = pos0;
- this.pos1 = pos1;
- }
-
- @Override
- public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
- return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
- }
- }
-
- static class JoinComparator implements ITuplePairComparator {
-
- private final IBinaryComparator bComparator;
- private final int field0;
- private final int field1;
-
- public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
- this.bComparator = bComparator;
- this.field0 = field0;
- this.field1 = field1;
- }
-
- @Override
- public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
- throws HyracksDataException {
- int tStart0 = accessor0.getTupleStartOffset(tIndex0);
- int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
-
- int tStart1 = accessor1.getTupleStartOffset(tIndex1);
- int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
-
- int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
- int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
- int fLen0 = fEnd0 - fStart0;
-
- int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
- int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
- int fLen1 = fEnd1 - fStart1;
-
- int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
- .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
- if (c != 0) {
- return c;
- }
- return 0;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Sort.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Sort.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Sort.java
new file mode 100644
index 0000000..7570b0b
--- /dev/null
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Sort.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.examples.tpch.client;
+
+import static edu.uci.ics.hyracks.examples.tpch.client.Common.createPartitionConstraint;
+import static edu.uci.ics.hyracks.examples.tpch.client.Common.orderParserFactories;
+import static edu.uci.ics.hyracks.examples.tpch.client.Common.ordersDesc;
+import static edu.uci.ics.hyracks.examples.tpch.client.Common.parseFileSplits;
+
+import java.util.EnumSet;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
+
+public class Sort {
+ private static class Options {
+ @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+ public String host;
+
+ @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)", required = false)
+ public int port = 1098;
+
+ @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+ public int frameSize = 32768;
+
+ @Option(name = "-frame-limit", usage = "memory limit for sorting (default: 4)", required = false)
+ public int frameLimit = 4;
+
+ @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true)
+ public String inFileOrderSplits;
+
+ @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+ public String outFileSplits;
+
+ @Option(name = "-membuffer-alg", usage = "bestfit or lastfit (default: lastfit)", required = false)
+ public String memBufferAlg = "lastfit";
+
+ @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
+ public boolean profile = true;
+
+ @Option(name = "-topK", usage = "only output topK for each node. (default: not set)")
+ public int topK = Integer.MAX_VALUE;
+
+ @Option(name = "-heapSort", usage = "using heap sort for topK result. (default: false)")
+ public boolean usingHeapSorter = false;
+ }
+
+ static int[] SortFields = new int[] { 1, 0 };
+ static IBinaryComparatorFactory[] SortFieldsComparatorFactories = new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+
+ static IBinaryHashFunctionFactory[] orderBinaryHashFunctionFactories = new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) };
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ if (args.length == 0) {
+ parser.printUsage(System.err);
+ return;
+ }
+ parser.parseArgument(args);
+
+ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+ JobSpecification job = createJob(parseFileSplits(options.inFileOrderSplits),
+ parseFileSplits(options.outFileSplits),
+ options.memBufferAlg, options.frameLimit, options.frameSize, options.topK, options.usingHeapSorter);
+
+ long start = System.currentTimeMillis();
+ JobId jobId = hcc.startJob(job,
+ options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println("finished in:" + (end - start) + "ms");
+ }
+
+ private static JobSpecification createJob(FileSplit[] ordersSplits, FileSplit[] outputSplit, String memBufferAlg,
+ int frameLimit, int frameSize, int limit, boolean usingHeapSorter) {
+ JobSpecification spec = new JobSpecification();
+
+ spec.setFrameSize(frameSize);
+ IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+ new DelimitedDataTupleParserFactory(orderParserFactories, '|'), ordersDesc);
+ createPartitionConstraint(spec, ordScanner, ordersSplits);
+ AbstractSorterOperatorDescriptor sorter;
+ if (usingHeapSorter && limit < Integer.MAX_VALUE) {
+ sorter = new TopKSorterOperatorDescriptor(spec, frameLimit, limit, SortFields, null,
+ SortFieldsComparatorFactories, ordersDesc);
+ } else {
+ if (memBufferAlg.equalsIgnoreCase("bestfit")) {
+ sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields,
+ null, SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT,
+ EnumFreeSlotPolicy.SMALLEST_FIT, limit);
+ } else if (memBufferAlg.equalsIgnoreCase("biggestfit")) {
+ sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null,
+ SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.BIGGEST_FIT,
+ limit);
+ } else {
+ sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null,
+ SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT,
+ limit);
+
+ }
+ }
+ createPartitionConstraint(spec, sorter, ordersSplits);
+ IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(outputSplit);
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|");
+ createPartitionConstraint(spec, printer, outputSplit);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+
+ spec.connect(
+ new MToNPartitioningMergingConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(SortFields, orderBinaryHashFunctionFactories),
+ SortFields, SortFieldsComparatorFactories, new UTF8StringNormalizedKeyComputerFactory()),
+ sorter, 0, printer, 0);
+
+ spec.addRoot(printer);
+ return spec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index 4e48e9b..cb1ca87 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -76,7 +76,7 @@ public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorD
private FSDataOutputStream dos;
private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
- private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
+ private FrameTupleAccessor accessor = new FrameTupleAccessor(inputRd);
private FrameTupleReference tuple = new FrameTupleReference();
private ITupleWriter tupleWriter;
private ClassLoader ctxCL;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
index 92cde9d..62cd76a 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -15,12 +15,11 @@
package edu.uci.ics.hyracks.hdfs.lib;
-import java.nio.ByteBuffer;
-
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -37,9 +36,7 @@ public class TextKeyValueParserFactory implements IKeyValueParserFactory<LongWri
throws HyracksDataException {
final ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
- final ByteBuffer buffer = ctx.allocateFrame();
- final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(buffer, true);
+ final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
return new IKeyValueParser<LongWritable, Text>() {
@@ -53,18 +50,13 @@ public class TextKeyValueParserFactory implements IKeyValueParserFactory<LongWri
throws HyracksDataException {
tb.reset();
tb.addField(value.getBytes(), 0, value.getLength());
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(buffer, writer);
- appender.reset(buffer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("tuple cannot be appended into the frame");
- }
- }
+ FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+ tb.getSize());
}
@Override
public void close(IFrameWriter writer) throws HyracksDataException {
- FrameUtils.flushFrame(buffer, writer);
+ appender.flush(writer, false);
}
};
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 068cdfc..0d24ee5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -75,7 +75,7 @@ public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorD
private FSDataOutputStream dos;
private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
- private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
+ private FrameTupleAccessor accessor = new FrameTupleAccessor(inputRd);
private FrameTupleReference tuple = new FrameTupleReference();
private ITupleWriter tupleWriter;
private ClassLoader ctxCL;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
index e60026e..785258f 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
@@ -17,7 +17,6 @@ package edu.uci.ics.hyracks.storage.am.btree.dataflow;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
@@ -64,14 +63,8 @@ public class BTreeUpdateSearchOperatorNodePushable extends BTreeSearchOperatorNo
dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
tb.addFieldEndOffset();
}
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(writeBuffer, writer);
- appender.reset(writeBuffer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
- + appender.getBuffer().capacity() + ")");
- }
- }
+ FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+ tb.getSize());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 9f3c5dd..9aec4cb 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -66,7 +66,7 @@ public class IndexBulkLoadOperatorNodePushable extends
public void open() throws HyracksDataException {
RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(
opDesc.getActivityId(), 0);
- accessor = new FrameTupleAccessor(ctx.getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(recDesc);
indexHelper.open();
index = indexHelper.getIndexInstance();
try {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 092fada..48a395b 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -16,6 +16,8 @@ package edu.uci.ics.hyracks.storage.am.common.dataflow;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -45,7 +47,7 @@ public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryIn
protected final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
protected FrameTupleAccessor accessor;
protected FrameTupleReference frameTuple;
- protected ByteBuffer writeBuffer;
+ protected IFrame writeBuffer;
protected IIndexAccessor indexAccessor;
protected ITupleFilter tupleFilter;
protected IModificationOperationCallback modCallback;
@@ -63,8 +65,8 @@ public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryIn
@Override
public void open() throws HyracksDataException {
RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
- accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
- writeBuffer = ctx.allocateFrame();
+ accessor = new FrameTupleAccessor(inputRecDesc);
+ writeBuffer = new VSizeFrame(ctx);
writer.open();
indexHelper.open();
IIndex index = indexHelper.getIndexInstance();
@@ -134,8 +136,9 @@ public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryIn
}
}
// Pass a copy of the frame to next op.
- System.arraycopy(buffer.array(), 0, writeBuffer.array(), 0, buffer.capacity());
- FrameUtils.flushFrame(writeBuffer, writer);
+ writeBuffer.ensureFrameSize(buffer.capacity());
+ FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
+ FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index a3f4e6f..fd727cf 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -18,6 +18,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -45,7 +46,6 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
protected final IIndexDataflowHelper indexHelper;
protected FrameTupleAccessor accessor;
- protected ByteBuffer writeBuffer;
protected FrameTupleAppender appender;
protected ArrayTupleBuilder tb;
protected DataOutput dos;
@@ -103,7 +103,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ accessor = new FrameTupleAccessor(inputRecDesc);
writer.open();
indexHelper.open();
index = indexHelper.getIndexInstance();
@@ -126,11 +126,9 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
try {
searchPred = createSearchPredicate();
- writeBuffer = ctx.allocateFrame();
tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(writeBuffer, true);
+ appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
.createSearchOperationCallback(indexHelper.getResourceID(), ctx);
indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, searchCallback);
@@ -162,27 +160,13 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
tb.addFieldEndOffset();
}
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(writeBuffer, writer);
- appender.reset(writeBuffer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
- + appender.getBuffer().capacity() + ")");
- }
- }
+ FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+ tb.getSize());
}
if (!matched && retainInput && retainNull) {
- if (!appender.appendConcat(accessor, tupleIndex, nullTupleBuild.getFieldEndOffsets(),
- nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
- FrameUtils.flushFrame(writeBuffer, writer);
- appender.reset(writeBuffer, true);
- if (!appender.appendConcat(accessor, tupleIndex, nullTupleBuild.getFieldEndOffsets(),
- nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
- throw new HyracksDataException("Record size larger than frame size ("
- + appender.getBuffer().capacity() + ")");
- }
- }
+ FrameUtils.appendConcatToWriter(writer, appender, accessor, tupleIndex,
+ nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize());
}
}
@@ -205,9 +189,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
@Override
public void close() throws HyracksDataException {
try {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffer, writer);
- }
+ appender.flush(writer, true);
try {
cursor.close();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index cb6270f..0db195d 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -15,8 +15,8 @@
package edu.uci.ics.hyracks.storage.am.common.dataflow;
import java.io.DataOutput;
-import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -60,9 +60,7 @@ public class TreeIndexDiskOrderScanOperatorNodePushable extends AbstractUnaryOut
try {
indexAccessor.diskOrderScan(cursor);
int fieldCount = treeIndex.getFieldCount();
- ByteBuffer frame = ctx.allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(frame, true);
+ FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
DataOutput dos = tb.getDataOutput();
@@ -72,21 +70,15 @@ public class TreeIndexDiskOrderScanOperatorNodePushable extends AbstractUnaryOut
ITupleReference frameTuple = cursor.getTuple();
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+ frameTuple.getFieldLength(i));
tb.addFieldEndOffset();
}
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
- }
- }
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame, writer);
+ FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+ tb.getSize());
}
+ appender.flush(writer, true);
} catch (Exception e) {
writer.fail();
throw new HyracksDataException(e);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index 717d326..f340142 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -15,14 +15,13 @@
package edu.uci.ics.hyracks.storage.am.common.dataflow;
import java.io.DataOutput;
-import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
@@ -70,18 +69,18 @@ public class TreeIndexStatsOperatorNodePushable extends AbstractUnaryOutputSourc
.getInteriorFrameFactory().createFrame(), treeIndex.getFreePageManager().getMetaDataFrameFactory()
.createFrame());
// Write the stats output as a single string field.
- ByteBuffer frame = ctx.allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(frame, true);
+ FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
DataOutput dos = tb.getDataOutput();
tb.reset();
UTF8StringSerializerDeserializer.INSTANCE.serialize(stats.toString(), dos);
tb.addFieldEndOffset();
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
+ throw new HyracksDataException(
+ "Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity()
+ + ")");
}
- FrameUtils.flushFrame(frame, writer);
+ appender.flush(writer, false);
} catch (Exception e) {
writer.fail();
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index 5bf52e4..fd693cf 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -30,6 +30,12 @@ public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertU
protected FrameTupleAppender appender;
+ @Override
+ public void open() throws HyracksDataException {
+ super.open();
+ appender = new FrameTupleAppender(writeBuffer);
+ }
+
public LSMIndexInsertUpdateDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op) {
super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, op);
@@ -85,8 +91,8 @@ public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertU
break;
}
default: {
- throw new HyracksDataException("Unsupported operation " + op
- + " in tree index InsertUpdateDelete operator");
+ throw new HyracksDataException(
+ "Unsupported operation " + op + " in tree index InsertUpdateDelete operator");
}
}
} catch (HyracksDataException e) {
@@ -97,8 +103,9 @@ public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertU
}
if (nextFlushTupleIndex == 0) {
// No partial flushing was necessary. Forward entire frame.
- System.arraycopy(buffer.array(), 0, writeBuffer.array(), 0, buffer.capacity());
- FrameUtils.flushFrame(writeBuffer, writer);
+ writeBuffer.ensureFrameSize(buffer.capacity());
+ FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
+ FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
} else {
// Flush remaining partial frame.
flushPartialFrame(nextFlushTupleIndex, tupleCount);
@@ -106,17 +113,9 @@ public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertU
}
private void flushPartialFrame(int startTupleIndex, int endTupleIndex) throws HyracksDataException {
- if (appender == null) {
- appender = new FrameTupleAppender(ctx.getFrameSize());
- }
- appender.reset(writeBuffer, true);
for (int i = startTupleIndex; i < endTupleIndex; i++) {
- if (!appender.append(accessor, i)) {
- throw new HyracksDataException("Record size ("
- + (accessor.getTupleEndOffset(i) - accessor.getTupleStartOffset(i))
- + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
- }
+ FrameUtils.appendToWriter(writer, appender, accessor, i);
}
- FrameUtils.flushFrame(writeBuffer, writer);
+ appender.flush(writer, true);
}
}