You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:41:15 UTC
[02/51] [partial] incubator-asterixdb-hyracks git commit: Change
folder structure for Java repackage
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
deleted file mode 100644
index b328aab..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
-
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-
-public abstract class AbstractOneInputSinkPushRuntime implements IPushRuntime {
- protected RecordDescriptor inputRecordDesc;
-
- @Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- throw new IllegalStateException();
- }
-
- @Override
- public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
- this.inputRecordDesc = recordDescriptor;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
deleted file mode 100644
index 8e71cd7..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public abstract class AbstractOneInputSourcePushRuntime extends AbstractOneInputPushRuntime {
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws HyracksDataException {
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
- public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
deleted file mode 100644
index 33f28c3..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class SinkRuntimeFactory implements IPushRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- public SinkRuntimeFactory() {
- }
-
- @Override
- public String toString() {
- return "sink";
- }
-
- @Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
- return new AbstractOneInputSinkPushRuntime() {
-
- @Override
- public void open() throws HyracksDataException {
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
-
- @Override
- public void close() throws HyracksDataException {
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
deleted file mode 100644
index 3380d08..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.group;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
-
-public class MicroPreClusteredGroupRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
- private final int[] groupFields;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final IAggregatorDescriptorFactory aggregatorFactory;
- private final RecordDescriptor inRecordDesc;
- private final RecordDescriptor outRecordDesc;
-
- public MicroPreClusteredGroupRuntimeFactory(int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
- IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDesc, int[] projectionList) {
- super(projectionList);
- // Obs: the projection list is currently ignored.
- if (projectionList != null) {
- throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
- }
- this.groupFields = groupFields;
- this.comparatorFactories = comparatorFactories;
- this.aggregatorFactory = aggregatorFactory;
- this.inRecordDesc = inRecordDesc;
- this.outRecordDesc = outRecordDesc;
- }
-
- @Override
- public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
- throws AlgebricksException {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
-
- return new AbstractOneInputOneOutputPushRuntime() {
-
- private PreclusteredGroupWriter pgw;
-
- @Override
- public void open() throws HyracksDataException {
- pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
- outRecordDesc, writer);
- pgw.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- pgw.nextFrame(buffer);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- pgw.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- pgw.close();
- }
- };
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
deleted file mode 100644
index 06ef109..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
-
-import java.nio.ByteBuffer;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- // array of factories for building the local runtime pipeline
- private final AlgebricksPipeline pipeline;
-
- public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity,
- IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) {
- super(spec, inputArity, outputArity);
- if (outputArity == 1) {
- this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
- }
- this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors);
- }
-
- public AlgebricksPipeline getPipeline() {
- return pipeline;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject json = super.toJSON();
- json.put("micro-operators", pipeline.getRuntimeFactories());
- return json;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Asterix { \n");
- for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
- sb.append(" " + f.toString() + ";\n");
- }
- sb.append("}");
- // sb.append(super.getInputArity());
- // sb.append(";");
- // sb.append(super.getOutputArity());
- // sb.append(";");
- return sb.toString();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- if (inputArity == 0) {
- return createSourceInputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- } else {
- return createOneInputOneOutputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- }
- }
-
- private IOperatorNodePushable createSourceInputPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
-
- public void initialize() throws HyracksDataException {
- IFrameWriter startOfPipeline;
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
-
- PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
- pipelineOutputRecordDescriptor);
- try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- startOfPipeline.open();
- startOfPipeline.close();
- }
- };
- }
-
- private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-
- private IFrameWriter startOfPipeline;
-
- @Override
- public void open() throws HyracksDataException {
- if (startOfPipeline == null) {
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
- RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
- AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
- PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
- pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
- try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- startOfPipeline.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- startOfPipeline.nextFrame(buffer);
- }
-
- @Override
- public void close() throws HyracksDataException {
- startOfPipeline.close();
- }
-
- @Override
- public void fail() throws HyracksDataException {
- startOfPipeline.fail();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
deleted file mode 100644
index 5bcf933..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class PipelineAssembler {
-
- // array of factories for building the local runtime pipeline
- private final RecordDescriptor pipelineInputRecordDescriptor;
- private final RecordDescriptor pipelineOutputRecordDescriptor;
-
- private final int inputArity;
- private final int outputArity;
- private final AlgebricksPipeline pipeline;
-
- public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
- RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) {
- this.pipeline = pipeline;
- this.pipelineInputRecordDescriptor = pipelineInputRecordDescriptor;
- this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
- this.inputArity = inputArity;
- this.outputArity = outputArity;
- }
-
- public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws AlgebricksException,
- HyracksDataException {
- // plug the operators
- IFrameWriter start = writer;// this.writer;
- for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
- IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
- if (i == pipeline.getRuntimeFactories().length - 1) {
- if (outputArity == 1) {
- newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor);
- }
- } else {
- newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
- }
- if (i > 0) {
- newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]);
- } else if (inputArity > 0) {
- newRuntime.setInputRecordDescriptor(0, pipelineInputRecordDescriptor);
- }
- start = newRuntime;
- }
- return start;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
deleted file mode 100644
index cf40669..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
-
-import java.io.DataOutput;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
-import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final AlgebricksPipeline pipeline;
- private final RecordDescriptor inputRecordDesc;
- private final INullWriterFactory[] nullWriterFactories;
-
- public SubplanRuntimeFactory(AlgebricksPipeline pipeline, INullWriterFactory[] nullWriterFactories,
- RecordDescriptor inputRecordDesc, int[] projectionList) {
- super(projectionList);
- this.pipeline = pipeline;
- this.nullWriterFactories = nullWriterFactories;
- this.inputRecordDesc = inputRecordDesc;
- if (projectionList != null) {
- throw new NotImplementedException();
- }
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Subplan { \n");
- for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
- sb.append(" " + f.toString() + ";\n");
- }
- sb.append("}");
- return sb.toString();
- }
-
- @Override
- public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
- throws AlgebricksException, HyracksDataException {
-
- RecordDescriptor pipelineOutputRecordDescriptor = null;
-
- final PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc,
- pipelineOutputRecordDescriptor);
- final INullWriter[] nullWriters = new INullWriter[nullWriterFactories.length];
- for (int i = 0; i < nullWriterFactories.length; i++) {
- nullWriters[i] = nullWriterFactories[i].createNullWriter();
- }
-
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
-
- /**
- * Computes the outer product between a given tuple and the frames
- * passed.
- */
- class TupleOuterProduct implements IFrameWriter {
-
- private boolean smthWasWritten = false;
- private FrameTupleAccessor ta = new FrameTupleAccessor(
- pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]);
- private ArrayTupleBuilder tb = new ArrayTupleBuilder(nullWriters.length);
-
- @Override
- public void open() throws HyracksDataException {
- smthWasWritten = false;
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- ta.reset(buffer);
- int nTuple = ta.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t);
- }
- smthWasWritten = true;
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (!smthWasWritten) {
- // the case when we need to write nulls
- appendNullsToTuple();
- appendToFrameFromTupleBuilder(tb);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- private void appendNullsToTuple() throws HyracksDataException {
- tb.reset();
- int n0 = tRef.getFieldCount();
- for (int f = 0; f < n0; f++) {
- tb.addField(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), f);
- }
- DataOutput dos = tb.getDataOutput();
- for (int i = 0; i < nullWriters.length; i++) {
- nullWriters[i].writeNull(dos);
- tb.addFieldEndOffset();
- }
- }
-
- }
-
- IFrameWriter endPipe = new TupleOuterProduct();
-
- NestedTupleSourceRuntime startOfPipeline = (NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx);
-
- boolean first = true;
-
- @Override
- public void open() throws HyracksDataException {
- if (first) {
- first = false;
- initAccessAppendRef(ctx);
- }
- writer.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tRef.reset(tAccess, t);
- startOfPipeline.writeTuple(buffer, t);
- startOfPipeline.open();
- startOfPipeline.close();
- }
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
deleted file mode 100644
index d3751f5..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.sort;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorterMergeSort;
-import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
-import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
-import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
-import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
-
-public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final int[] sortFields;
- private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
- private IBinaryComparatorFactory[] comparatorFactories;
-
- public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
- IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) {
- super(projectionList);
- // Obs: the projection list is currently ignored.
- if (projectionList != null) {
- throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
- }
- this.sortFields = sortFields;
- this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
- this.comparatorFactories = comparatorFactories;
- }
-
- @Override
- public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
- throws AlgebricksException {
-
- return new AbstractOneInputOneOutputPushRuntime() {
-
- FrameSorterMergeSort frameSorter = null;
-
- @Override
- public void open() throws HyracksDataException {
- if (frameSorter == null) {
- IFrameBufferManager manager = new VariableFrameMemoryManager(
- new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
- new FrameFreeSlotLastFit());
- frameSorter = new FrameSorterMergeSort(ctx, manager, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, outputRecordDesc);
- }
- frameSorter.reset();
- writer.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- frameSorter.insertFrame(buffer);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- frameSorter.sort();
- frameSorter.flush(writer);
- writer.close();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
deleted file mode 100644
index fb889ea..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.api.IPointable;
-import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-
-public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- private int[] outColumns;
- private IScalarEvaluatorFactory[] evalFactories;
- private final boolean flushFramesRapidly;
-
- /**
- * @param outColumns
- * a sorted array of columns into which the result is written to
- * @param evalFactories
- * @param projectionList
- * an array of columns to be projected
- */
-
- public AssignRuntimeFactory(int[] outColumns, IScalarEvaluatorFactory[] evalFactories, int[] projectionList) {
- this(outColumns, evalFactories, projectionList, false);
- }
-
- public AssignRuntimeFactory(int[] outColumns, IScalarEvaluatorFactory[] evalFactories, int[] projectionList,
- boolean flushFramesRapidly) {
- super(projectionList);
- this.outColumns = outColumns;
- this.evalFactories = evalFactories;
- this.flushFramesRapidly = flushFramesRapidly;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("assign [");
- for (int i = 0; i < outColumns.length; i++) {
- if (i > 0) {
- sb.append(", ");
- }
- sb.append(outColumns[i]);
- }
- sb.append("] := [");
- for (int i = 0; i < evalFactories.length; i++) {
- if (i > 0) {
- sb.append(", ");
- }
- sb.append(evalFactories[i]);
- }
- sb.append("]");
- return sb.toString();
- }
-
- @Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
- throws AlgebricksException {
- final int[] projectionToOutColumns = new int[projectionList.length];
- for (int j = 0; j < projectionList.length; j++) {
- projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
- }
-
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private IPointable result = VoidPointable.FACTORY.createPointable();
- private IScalarEvaluator[] eval = new IScalarEvaluator[evalFactories.length];
- private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
- private boolean first = true;
-
- @Override
- public void open() throws HyracksDataException {
- if (first) {
- initAccessAppendRef(ctx);
- first = false;
- int n = evalFactories.length;
- for (int i = 0; i < n; i++) {
- try {
- eval[i] = evalFactories[i].createScalarEvaluator(ctx);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- }
- writer.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- int t = 0;
- if (nTuple > 1) {
- for (; t < nTuple - 1; t++) {
- tRef.reset(tAccess, t);
- produceTuple(tupleBuilder, tAccess, t, tRef);
- appendToFrameFromTupleBuilder(tupleBuilder);
- }
- }
-
- tRef.reset(tAccess, t);
- produceTuple(tupleBuilder, tAccess, t, tRef);
- if (flushFramesRapidly) {
- // Whenever all the tuples in the incoming frame have been consumed, the assign operator
- // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
- appendToFrameFromTupleBuilder(tupleBuilder, true);
- } else {
- appendToFrameFromTupleBuilder(tupleBuilder);
- }
- }
-
- private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
- FrameTupleReference tupleRef) throws HyracksDataException {
- tb.reset();
- for (int f = 0; f < projectionList.length; f++) {
- int k = projectionToOutColumns[f];
- if (k >= 0) {
- try {
- eval[k].evaluate(tupleRef, result);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- tb.addField(result.getByteArray(), result.getStartOffset(), result.getLength());
- } else {
- tb.addField(accessor, tIndex, projectionList[f]);
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
deleted file mode 100644
index 35fcafc..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSourcePushRuntime;
-import edu.uci.ics.hyracks.api.comm.VSizeFrame;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- public EmptyTupleSourceRuntimeFactory() {
- }
-
- @Override
- public String toString() {
- return "ets";
- }
-
- @Override
- public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException {
- return new AbstractOneInputSourcePushRuntime() {
-
- private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
- private FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
-
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- appender.flush(writer, true);
- writer.close();
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
deleted file mode 100644
index 8df87ab..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- public NestedTupleSourceRuntimeFactory() {
- }
-
- @Override
- public String toString() {
- return "nts";
- }
-
- @Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
- return new NestedTupleSourceRuntime(ctx);
- }
-
- public static class NestedTupleSourceRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
-
- public NestedTupleSourceRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
- initAccessAppend(ctx);
- }
-
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- }
-
- public void writeTuple(ByteBuffer inputBuffer, int tIndex) throws HyracksDataException {
- tAccess.reset(inputBuffer);
- appendTupleToFrame(tIndex);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- throw new IllegalStateException();
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- public void forceFlush() throws HyracksDataException {
- appender.flush(writer, true);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
deleted file mode 100644
index 1d2d27c..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.comm.IFrame;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.comm.VSizeFrame;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-
-public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
- public static int NO_DEFAULT_BRANCH = -1;
-
- private final ICopyEvaluatorFactory[] evalFactories;
- private final IBinaryBooleanInspector boolInspector;
- private final int defaultBranchIndex;
-
- public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, ICopyEvaluatorFactory[] evalFactories,
- IBinaryBooleanInspector boolInspector, int defaultBranchIndex, RecordDescriptor rDesc) {
- super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length);
- for (int i = 0; i < evalFactories.length; i++) {
- recordDescriptors[i] = rDesc;
- }
- this.evalFactories = evalFactories;
- this.boolInspector = boolInspector;
- this.defaultBranchIndex = defaultBranchIndex;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
- throws HyracksDataException {
- return new AbstractUnaryInputOperatorNodePushable() {
- private final IFrameWriter[] writers = new IFrameWriter[outputArity];
- private final IFrame[] writeBuffers = new IFrame[outputArity];
- private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
- private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
- private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
- 0);
- private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc);
- private final FrameTupleReference frameTuple = new FrameTupleReference();
-
- private final FrameTupleAppender tupleAppender = new FrameTupleAppender();
- private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
- private final DataOutput tupleDos = tupleBuilder.getDataOutput();
-
- @Override
- public void close() throws HyracksDataException {
- // Flush (possibly not full) buffers that have data, and close writers.
- for (int i = 0; i < outputArity; i++) {
- tupleAppender.reset(writeBuffers[i], false);
- // ? by JF why didn't clear the buffer ?
- tupleAppender.flush(writers[i], false);
- writers[i].close();
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- writer.fail();
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- frameTuple.reset(accessor, i);
- boolean found = false;
- for (int j = 0; j < evals.length; j++) {
- try {
- evalBuf.reset();
- evals[j].evaluate(frameTuple);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- found = boolInspector.getBooleanValue(evalBuf.getByteArray(), 0, 1);
- if (found) {
- copyAndAppendTuple(j);
- break;
- }
- }
- // Optionally write to default output branch.
- if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) {
- copyAndAppendTuple(defaultBranchIndex);
- }
- }
- }
-
- private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
- // Copy tuple into tuple builder.
- try {
- tupleBuilder.reset();
- for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
- frameTuple.getFieldLength(i));
- tupleBuilder.addFieldEndOffset();
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
- }
-
- @Override
- public void open() throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- writer.open();
- }
- // Create write buffers.
- for (int i = 0; i < outputArity; i++) {
- writeBuffers[i] = new VSizeFrame(ctx);
- // Make sure to clear all buffers, since we are reusing the tupleAppender.
- tupleAppender.reset(writeBuffers[i], true);
- }
- // Create evaluators for partitioning.
- try {
- for (int i = 0; i < evalFactories.length; i++) {
- evals[i] = evalFactories[i].createEvaluator(evalBuf);
- }
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- writers[index] = writer;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
deleted file mode 100644
index c2ecc56..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import edu.uci.ics.hyracks.algebricks.data.IAWriter;
-import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class PrinterRuntimeFactory implements IPushRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final int[] printColumns;
- private final IPrinterFactory[] printerFactories;
- private final RecordDescriptor inputRecordDesc;
-
- public PrinterRuntimeFactory(int[] printColumns, IPrinterFactory[] printerFactories,
- RecordDescriptor inputRecordDesc) {
- this.printColumns = printColumns;
- this.printerFactories = printerFactories;
- this.inputRecordDesc = inputRecordDesc;
- }
-
- @Override
- public String toString() {
- StringBuilder buf = new StringBuilder();
- buf.append("print [");
- for (int i = 0; i < printColumns.length; i++) {
- if (i > 0) {
- buf.append("; ");
- }
- buf.append(printColumns[i]);
- }
- buf.append("]");
- return buf.toString();
- }
-
- @Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
- IAWriter w = PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, printerFactories,
- inputRecordDesc);
- return new SinkWriterRuntime(w, ctx, System.out, inputRecordDesc);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
deleted file mode 100644
index 154f2d1..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.api.IPointable;
-import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-
-public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final int[] outColumns;
- private final IRunningAggregateEvaluatorFactory[] runningAggregates;
-
- /**
- * @param outColumns
- * a sorted array of columns into which the result is written to
- * @param runningAggregates
- * @param projectionList
- * an array of columns to be projected
- */
-
- public RunningAggregateRuntimeFactory(int[] outColumns, IRunningAggregateEvaluatorFactory[] runningAggregates,
- int[] projectionList) {
- super(projectionList);
- this.outColumns = outColumns;
- this.runningAggregates = runningAggregates;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("running-aggregate [");
- for (int i = 0; i < outColumns.length; i++) {
- if (i > 0) {
- sb.append(", ");
- }
- sb.append(outColumns[i]);
- }
- sb.append("] := [");
- for (int i = 0; i < runningAggregates.length; i++) {
- if (i > 0) {
- sb.append(", ");
- }
- sb.append(runningAggregates[i]);
- }
- sb.append("]");
- return sb.toString();
- }
-
- @Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
- throws AlgebricksException {
- final int[] projectionToOutColumns = new int[projectionList.length];
- for (int j = 0; j < projectionList.length; j++) {
- projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
- }
-
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private final IPointable p = VoidPointable.FACTORY.createPointable();
- private final IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
- private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
- private boolean first = true;
-
- @Override
- public void open() throws HyracksDataException {
- initAccessAppendRef(ctx);
- if (first) {
- first = false;
- int n = runningAggregates.length;
- for (int i = 0; i < n; i++) {
- try {
- raggs[i] = runningAggregates[i].createRunningAggregateEvaluator();
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- }
- for (int i = 0; i < runningAggregates.length; i++) {
- try {
- raggs[i].init();
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- writer.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tRef.reset(tAccess, t);
- produceTuple(tupleBuilder, tAccess, t, tRef);
- appendToFrameFromTupleBuilder(tupleBuilder);
- }
- }
-
- private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
- FrameTupleReference tupleRef) throws HyracksDataException {
- tb.reset();
- for (int f = 0; f < projectionList.length; f++) {
- int k = projectionToOutColumns[f];
- if (k >= 0) {
- try {
- raggs[k].step(tupleRef, p);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- tb.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
- } else {
- tb.addField(accessor, tIndex, projectionList[f]);
- }
- }
- }
-
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
deleted file mode 100644
index d26d090..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import java.io.PrintStream;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.data.IAWriter;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
-
- private final IHyracksTaskContext ctx;
- private final PrintStream printStream;
- private final IAWriter writer;
- private RecordDescriptor inputRecordDesc;
- private FrameTupleAccessor tAccess;
- private boolean autoClose = false;
- private boolean first = true;
-
- public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
- RecordDescriptor inputRecordDesc) {
- this.writer = writer;
- this.ctx = ctx;
- this.printStream = printStream;
- this.inputRecordDesc = inputRecordDesc;
- this.tAccess = new FrameTupleAccessor(inputRecordDesc);
- }
-
- public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
- RecordDescriptor inputRecordDesc, boolean autoClose) {
- this(writer, ctx, printStream, inputRecordDesc);
- this.autoClose = autoClose;
- }
-
- @Override
- public void open() throws HyracksDataException {
- if (first) {
- first = false;
- tAccess = new FrameTupleAccessor(inputRecordDesc);
- try {
- writer.init();
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- try {
- writer.printTuple(tAccess, t);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (autoClose) {
- printStream.close();
- }
- }
-
- @Override
- public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
- this.inputRecordDesc = recordDescriptor;
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
deleted file mode 100644
index a98bef3..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.data.IAWriter;
-import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
-import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class SinkWriterRuntimeFactory implements IPushRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final int[] fields;
- private final IPrinterFactory[] printerFactories;
- private final File outputFile;
- private final RecordDescriptor inputRecordDesc;
- private final IAWriterFactory writerFactory;
-
- public SinkWriterRuntimeFactory(int[] fields, IPrinterFactory[] printerFactories, File outputFile,
- IAWriterFactory writerFactory, RecordDescriptor inputRecordDesc) {
- this.fields = fields;
- this.printerFactories = printerFactories;
- this.outputFile = outputFile;
- this.writerFactory = writerFactory;
- this.inputRecordDesc = inputRecordDesc;
- }
-
- @Override
- public String toString() {
- StringBuilder buf = new StringBuilder();
- buf.append("sink-write " + "[");
- for (int i = 0; i < fields.length; i++) {
- if (i > 0) {
- buf.append("; ");
- }
- buf.append(fields[i]);
- }
- buf.append("] outputFile");
- return buf.toString();
- }
-
- @Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
- PrintStream filePrintStream = null;
- try {
- filePrintStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile)));
- } catch (FileNotFoundException e) {
- throw new AlgebricksException(e);
- }
- IAWriter w = writerFactory.createWriter(fields, filePrintStream, printerFactories, inputRecordDesc);
- return new SinkWriterRuntime(w, ctx, filePrintStream, inputRecordDesc, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
deleted file mode 100644
index 7d128aa..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.api.IPointable;
-import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
-
-public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final IScalarEvaluatorFactory maxObjectsEvalFactory;
- private final IScalarEvaluatorFactory offsetEvalFactory;
- private final IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
-
- public StreamLimitRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory,
- IScalarEvaluatorFactory offsetEvalFactory, int[] projectionList,
- IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory) {
- super(projectionList);
- this.maxObjectsEvalFactory = maxObjectsEvalFactory;
- this.offsetEvalFactory = offsetEvalFactory;
- this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
- }
-
- @Override
- public String toString() {
- String s = "stream-limit " + maxObjectsEvalFactory.toString();
- if (offsetEvalFactory != null) {
- return s + ", " + offsetEvalFactory.toString();
- } else {
- return s;
- }
- }
-
- @Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
- final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private final IPointable p = VoidPointable.FACTORY.createPointable();
- private IScalarEvaluator evalMaxObjects;
- private IScalarEvaluator evalOffset = null;
- private int toWrite = 0; // how many tuples still to write
- private int toSkip = 0; // how many tuples still to skip
- private boolean firstTuple = true;
- private boolean afterLastTuple = false;
-
- @Override
- public void open() throws HyracksDataException {
- // if (first) {
- if (evalMaxObjects == null) {
- initAccessAppendRef(ctx);
- try {
- evalMaxObjects = maxObjectsEvalFactory.createScalarEvaluator(ctx);
- if (offsetEvalFactory != null) {
- evalOffset = offsetEvalFactory.createScalarEvaluator(ctx);
- }
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- writer.open();
- afterLastTuple = false;
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- if (afterLastTuple) {
- // ignore the data
- return;
- }
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- int start = 0;
- if (nTuple <= toSkip) {
- toSkip -= nTuple;
- return;
- } else if (toSkip > 0) {
- start = toSkip;
- toSkip = 0;
- }
- for (int t = start; t < nTuple; t++) {
- if (firstTuple) {
- firstTuple = false;
- toWrite = evaluateInteger(evalMaxObjects, t);
- if (evalOffset != null) {
- toSkip = evaluateInteger(evalOffset, t);
- }
- }
- if (toSkip > 0) {
- toSkip--;
- } else if (toWrite > 0) {
- toWrite--;
- if (projectionList != null) {
- appendProjectionToFrame(t, projectionList);
- } else {
- appendTupleToFrame(t);
- }
- } else {
- // close();
- afterLastTuple = true;
- break;
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- toWrite = 0; // how many tuples still to write
- toSkip = 0; // how many tuples still to skip
- firstTuple = true;
- afterLastTuple = false;
- // if (!afterLastTuple) {
- super.close();
- // }
- }
-
- private int evaluateInteger(IScalarEvaluator eval, int tIdx) throws HyracksDataException {
- tRef.reset(tAccess, tIdx);
- try {
- eval.evaluate(tRef, p);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- int lim = bii.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
- return lim;
- }
-
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
deleted file mode 100644
index c0ae029..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
- private final boolean flushFramesRapidly;
-
- public StreamProjectRuntimeFactory(int[] projectionList, boolean flushFramesRapidly) {
- super(projectionList);
- this.flushFramesRapidly = flushFramesRapidly;
- }
-
- public StreamProjectRuntimeFactory(int[] projectionList) {
- this(projectionList, false);
- }
-
- @Override
- public String toString() {
- return "stream-project " + Arrays.toString(projectionList);
- }
-
- @Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
- throws AlgebricksException {
-
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
-
- private boolean first = true;
-
- @Override
- public void open() throws HyracksDataException {
- if (first) {
- first = false;
- initAccessAppend(ctx);
- }
- writer.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
-
- int t = 0;
- if (nTuple > 1) {
- for (; t < nTuple - 1; t++) {
- appendProjectionToFrame(t, projectionList);
- }
- }
- if (flushFramesRapidly) {
- // Whenever all the tuples in the incoming frame have been consumed, the project operator
- // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
- appendProjectionToFrame(t, projectionList, true);
- } else {
- appendProjectionToFrame(t, projectionList);
- }
-
- }
-
- };
- }
-}