You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:16 UTC
[18/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/api/IResultCollector.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IResultCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IResultCollector.java
new file mode 100755
index 0000000..9f14ec0
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IResultCollector.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.DataOutput;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.IAObject;
+
+public interface IResultCollector {
+
+ public void writeIntResult(int result) throws AsterixException;
+
+ public void writeFloatResult(float result) throws AsterixException;
+
+ public void writeDoubleResult(double result) throws AsterixException;
+
+ public void writeStringResult(String result) throws AsterixException;
+
+ public void writeRecordResult(ARecord result) throws AsterixException;
+
+ public void writeListResult(AOrderedList list) throws AsterixException;
+
+ public IAObject getComplexTypeResultHolder();
+
+ public DataOutput getDataOutput();
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
new file mode 100644
index 0000000..31d6317
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.DataOutput;
+import java.io.InputStream;
+
+public interface IStreamDataParser extends IDataParser {
+ /**
+ * Sets the inputStream for the parser. called only for parsers that support InputStreams
+ */
+ public void setInputStream(InputStream in) throws Exception;
+
+ /**
+ * Parse data into output AsterixDataModel binary records.
+ * Used with parsers that support stream sources
+ *
+ * @param out
+ * DataOutput instance that for writing the parser output.
+ */
+
+ public boolean parse(DataOutput out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
new file mode 100644
index 0000000..828f71e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IStreamDataParserFactory extends IDataParserFactory {
+
+ public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException, AsterixException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
new file mode 100644
index 0000000..d368c48
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+public interface IStreamFlowController extends IDataFlowController {
+ public void setStreamParser(IStreamDataParser dataParser);
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
new file mode 100644
index 0000000..d06161e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractDataFlowController implements IDataFlowController {
+
+ protected ITupleForwarder tupleForwarder;
+ protected IHyracksTaskContext ctx;
+ protected Map<String, String> configuration;
+
+ @Override
+ public ITupleForwarder getTupleForwarder() {
+ return tupleForwarder;
+ }
+
+ @Override
+ public void setTupleForwarder(ITupleForwarder tupleForwarder) {
+ this.tupleForwarder = tupleForwarder;
+ }
+
+ protected void initializeTupleForwarder(IFrameWriter writer) throws HyracksDataException {
+ tupleForwarder.initialize(ctx, writer);
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) throws IOException {
+ this.configuration = configuration;
+ this.ctx = ctx;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
new file mode 100644
index 0000000..116ec09
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class CounterTimerTupleForwarder implements ITupleForwarder {
+
+ public static final String BATCH_SIZE = "batch-size";
+ public static final String BATCH_INTERVAL = "batch-interval";
+
+ private static final Logger LOGGER = Logger.getLogger(CounterTimerTupleForwarder.class.getName());
+
+ private FrameTupleAppender appender;
+ private IFrame frame;
+ private IFrameWriter writer;
+ private int batchSize;
+ private long batchInterval;
+ private int tuplesInFrame = 0;
+ private TimeBasedFlushTask flushTask;
+ private Timer timer;
+ private Object lock = new Object();
+ private boolean activeTimer = false;
+
+ @Override
+ public void configure(Map<String, String> configuration) {
+ String propValue = configuration.get(BATCH_SIZE);
+ if (propValue != null) {
+ batchSize = Integer.parseInt(propValue);
+ } else {
+ batchSize = -1;
+ }
+
+ propValue = configuration.get(BATCH_INTERVAL);
+ if (propValue != null) {
+ batchInterval = Long.parseLong(propValue);
+ activeTimer = true;
+ }
+ }
+
+ @Override
+ public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+ this.appender = new FrameTupleAppender();
+ this.frame = new VSizeFrame(ctx);
+ appender.reset(frame, true);
+ this.writer = writer;
+ if (activeTimer) {
+ this.timer = new Timer();
+ this.flushTask = new TimeBasedFlushTask(writer, lock);
+ timer.scheduleAtFixedRate(flushTask, 0, batchInterval);
+ }
+ }
+
+ @Override
+ public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+ if (activeTimer) {
+ synchronized (lock) {
+ addTupleToFrame(tb);
+ }
+ } else {
+ addTupleToFrame(tb);
+ }
+ tuplesInFrame++;
+ }
+
+ private void addTupleToFrame(ArrayTupleBuilder tb) throws HyracksDataException {
+ if (tuplesInFrame == batchSize
+ || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("flushing frame containg (" + tuplesInFrame + ") tuples");
+ }
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ tuplesInFrame = 0;
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ if (activeTimer) {
+ synchronized (lock) {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ }
+ } else {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ }
+ }
+
+ if (timer != null) {
+ timer.cancel();
+ }
+ }
+
+ private class TimeBasedFlushTask extends TimerTask {
+
+ private IFrameWriter writer;
+ private final Object lock;
+
+ public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
+ this.writer = writer;
+ this.lock = lock;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (tuplesInFrame > 0) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
+ }
+ synchronized (lock) {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ appender.reset(frame, true);
+ tuplesInFrame = 0;
+ }
+ }
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
new file mode 100644
index 0000000..36d41b4
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class FrameFullTupleForwarder implements ITupleForwarder {
+
+ private FrameTupleAppender appender;
+ private IFrame frame;
+ private IFrameWriter writer;
+
+ @Override
+ public void configure(Map<String, String> configuration) {
+ // no-op
+ }
+
+ @Override
+ public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+ this.appender = new FrameTupleAppender();
+ this.frame = new VSizeFrame(ctx);
+ this.writer = writer;
+ appender.reset(frame, true);
+ }
+
+ @Override
+ public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+ boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ if (!success) {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ appender.reset(frame, true);
+ success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ if (!success) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
new file mode 100644
index 0000000..68c6f9b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class IndexingDataFlowController<T> extends RecordDataFlowController<T> {
+ IExternalIndexer indexer;
+
+ @Override
+ protected void appendOtherTupleFields(ArrayTupleBuilder tb) throws Exception {
+ indexer.index(tb);
+ }
+
+ @Override
+ public void setRecordReader(IRecordReader<T> recordReader) throws Exception {
+ super.setRecordReader(recordReader);
+ indexer = ((IIndexingDatasource) recordReader).getIndexer();
+ numOfTupleFields += indexer.getNumberOfFields();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
new file mode 100644
index 0000000..99cc3d1
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class RateControlledTupleForwarder implements ITupleForwarder {
+
+ private FrameTupleAppender appender;
+ private IFrame frame;
+ private IFrameWriter writer;
+ private long interTupleInterval;
+ private boolean delayConfigured;
+
+ public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
+
+ @Override
+ public void configure(Map<String, String> configuration) {
+ String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+ if (propValue != null) {
+ interTupleInterval = Long.parseLong(propValue);
+ }
+ delayConfigured = interTupleInterval != 0;
+ }
+
+ @Override
+ public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+ this.appender = new FrameTupleAppender();
+ this.frame = new VSizeFrame(ctx);
+ this.writer = writer;
+ appender.reset(frame, true);
+ }
+
+ @Override
+ public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+ if (delayConfigured) {
+ try {
+ Thread.sleep(interTupleInterval);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ if (!success) {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ appender.reset(frame, true);
+ success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ if (!success) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame.getBuffer(), writer);
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
new file mode 100644
index 0000000..ad8e791
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordFlowController;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class RecordDataFlowController<T> extends AbstractDataFlowController implements IRecordFlowController<T> {
+
+ protected IRecordDataParser<T> dataParser;
+ protected IRecordReader<? extends T> recordReader;
+ protected int numOfTupleFields = 1;
+
+ @Override
+ public void start(IFrameWriter writer) throws HyracksDataException {
+ try {
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
+ initializeTupleForwarder(writer);
+ while (recordReader.hasNext()) {
+ IRawRecord<? extends T> record = recordReader.next();
+ tb.reset();
+ dataParser.parse(record, tb.getDataOutput());
+ tb.addFieldEndOffset();
+ appendOtherTupleFields(tb);
+ tupleForwarder.addTuple(tb);
+ }
+ tupleForwarder.close();
+ recordReader.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ protected void appendOtherTupleFields(ArrayTupleBuilder tb) throws Exception {
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+ @Override
+ public void setRecordParser(IRecordDataParser<T> dataParser) {
+ this.dataParser = dataParser;
+ }
+
+ @Override
+ public void setRecordReader(IRecordReader<T> recordReader) throws Exception {
+ this.recordReader = recordReader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
new file mode 100644
index 0000000..3016470
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.api.IStreamFlowController;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class StreamDataFlowController extends AbstractDataFlowController implements IStreamFlowController {
+ private IStreamDataParser dataParser;
+ private static final int NUMBER_OF_TUPLE_FIELDS = 1;
+
+ @Override
+ public void start(IFrameWriter writer) throws HyracksDataException {
+ try {
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(NUMBER_OF_TUPLE_FIELDS);
+ initializeTupleForwarder(writer);
+ while (true) {
+ tb.reset();
+ if (!dataParser.parse(tb.getDataOutput())) {
+ break;
+ }
+ tb.addFieldEndOffset();
+ tupleForwarder.addTuple(tb);
+ }
+ tupleForwarder.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+ @Override
+ public void setStreamParser(IStreamDataParser dataParser) {
+ this.dataParser = dataParser;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetEntity.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetEntity.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetEntity.java
deleted file mode 100644
index ede820d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetEntity.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import com.microsoft.windowsazure.services.table.client.TableServiceEntity;
-
-public class AzureTweetEntity extends TableServiceEntity {
-
- private String postingType;
- private String json;
-
- public AzureTweetEntity() {
- }
-
- public AzureTweetEntity(String userID, String postingID) {
- this.partitionKey = userID;
- this.rowKey = postingID;
- }
-
- public String getPostingType() {
- return postingType;
- }
-
- public void setPostingType(String postingType) {
- this.postingType = postingType;
- }
-
- public void setJSON(String json) {
- this.json = json;
- }
-
- public String getJSON() {
- return json;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetMetadataEntity.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetMetadataEntity.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetMetadataEntity.java
deleted file mode 100644
index ddda897..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetMetadataEntity.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import com.microsoft.windowsazure.services.table.client.TableServiceEntity;
-
-public class AzureTweetMetadataEntity extends TableServiceEntity {
- private String creationTimestamp;
- private String postingType;
- private String productId;
- private String ethnicity;
- private String gender;
- private String sentiment;
- private String location;
-
- public AzureTweetMetadataEntity() {
- }
-
- public AzureTweetMetadataEntity(String partitionKey, String rowKey) {
- this.partitionKey = partitionKey;
- this.rowKey = rowKey;
- }
-
- public String getCreationTimestamp() {
- return creationTimestamp;
- }
-
- public void setCreationTimestamp(String creationTimestamp) {
- this.creationTimestamp = creationTimestamp;
- }
-
- public String getPostingType() {
- return postingType;
- }
-
- public void setPostingType(String postingType) {
- this.postingType = postingType;
- }
-
- public String getProductId() {
- return productId;
- }
-
- public void setProductId(String productId) {
- this.productId = productId;
- }
-
- public String getEthnicity() {
- return ethnicity;
- }
-
- public void setEthnicity(String ethnicity) {
- this.ethnicity = ethnicity;
- }
-
- public String getGender() {
- return gender;
- }
-
- public void setGender(String gender) {
- this.gender = gender;
- }
-
- public String getSentiment() {
- return sentiment;
- }
-
- public void setSentiment(String sentiment) {
- this.sentiment = sentiment;
- }
-
- public String getLocation() {
- return location;
- }
-
- public void setLocation(String location) {
- this.location = location;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
deleted file mode 100644
index a197368..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.external.dataset.adapter.IFeedClient.InflowState;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * Acts as an abstract class for all pull-based external data adapters. Captures
- * the common logic for obtaining bytes from an external source and packing them
- * into frames as tuples.
- */
-public abstract class ClientBasedFeedAdapter implements IFeedAdapter {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(ClientBasedFeedAdapter.class.getName());
- private static final int timeout = 5; // seconds
-
- protected ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
- protected IFeedClient pullBasedFeedClient;
- protected ARecordType adapterOutputType;
- protected boolean continueIngestion = true;
- protected Map<String, String> configuration;
-
- private FrameTupleAppender appender;
- private IFrame frame;
- private long tupleCount = 0;
- private final IHyracksTaskContext ctx;
- private int frameTupleCount = 0;
-
- protected FeedPolicyEnforcer policyEnforcer;
-
- public FeedPolicyEnforcer getPolicyEnforcer() {
- return policyEnforcer;
- }
-
- public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
- this.policyEnforcer = policyEnforcer;
- }
-
- public abstract IFeedClient getFeedClient(int partition) throws Exception;
-
- public abstract ITupleForwardPolicy getTupleParserPolicy();
-
- public ClientBasedFeedAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) {
- this.ctx = ctx;
- this.configuration = configuration;
- }
-
- public long getIngestedRecordsCount() {
- return tupleCount;
- }
-
- @Override
- public void start(int partition, IFrameWriter writer) throws Exception {
- appender = new FrameTupleAppender();
- frame = new VSizeFrame(ctx);
- appender.reset(frame, true);
- ITupleForwardPolicy policy = getTupleParserPolicy();
- policy.configure(configuration);
- pullBasedFeedClient = getFeedClient(partition);
- InflowState inflowState = null;
- policy.initialize(ctx, writer);
- while (continueIngestion) {
- tupleBuilder.reset();
- try {
- inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput(), timeout);
- switch (inflowState) {
- case DATA_AVAILABLE:
- tupleBuilder.addFieldEndOffset();
- policy.addTuple(tupleBuilder);
- frameTupleCount++;
- break;
- case NO_MORE_DATA:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Reached end of feed");
- }
- policy.close();
- tupleCount += frameTupleCount;
- frameTupleCount = 0;
- continueIngestion = false;
- break;
- case DATA_NOT_AVAILABLE:
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Timed out on obtaining data from pull based adapter. Trying again!");
- }
- break;
- }
-
- } catch (Exception failureException) {
- try {
- failureException.printStackTrace();
- boolean continueIngestion = policyEnforcer.continueIngestionPostSoftwareFailure(failureException);
- if (continueIngestion) {
- tupleBuilder.reset();
- continue;
- } else {
- throw failureException;
- }
- } catch (Exception recoveryException) {
- throw new Exception(recoveryException);
- }
- }
- }
- }
-
- /**
- * Discontinue the ingestion of data and end the feed.
- *
- * @throws Exception
- */
- @Override
- public void stop() throws Exception {
- continueIngestion = false;
- }
-
- public Map<String, String> getConfiguration() {
- return configuration;
- }
-
- @Override
- public boolean handleException(Exception e) {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
deleted file mode 100644
index e321b67..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.builders.OrderedListBuilder;
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.ABoolean;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AMutableDateTime;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableOrderedList;
-import org.apache.asterix.om.base.AMutablePoint;
-import org.apache.asterix.om.base.AMutableRecord;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AMutableUnorderedList;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.base.IACursor;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.AOrderedListType;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.AUnorderedListType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-
-public abstract class FeedClient implements IFeedClient {
-
- protected static final Logger LOGGER = Logger.getLogger(FeedClient.class.getName());
-
- protected ARecordSerializerDeserializer recordSerDe;
- protected AMutableRecord mutableRecord;
- protected boolean messageReceived;
- protected boolean continueIngestion = true;
- protected IARecordBuilder recordBuilder = new RecordBuilder();
-
- protected AMutableString aString = new AMutableString("");
- protected AMutableInt32 aInt32 = new AMutableInt32(0);
- protected AMutablePoint aPoint = new AMutablePoint(0, 0);
- protected AMutableDateTime aDateTime = new AMutableDateTime(0);
-
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ASTRING);
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ABOOLEAN);
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
-
- public abstract InflowState retrieveNextRecord() throws Exception;
-
- @Override
- public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException {
- try {
- InflowState state = null;
- int waitCount = 0;
- boolean continueWait = true;
- while ((state == null || state.equals(InflowState.DATA_NOT_AVAILABLE)) && continueWait) {
- state = retrieveNextRecord();
- switch (state) {
- case DATA_AVAILABLE:
- recordBuilder.reset(mutableRecord.getType());
- recordBuilder.init();
- writeRecord(mutableRecord, dataOutput, recordBuilder);
- break;
- case DATA_NOT_AVAILABLE:
- if (waitCount > timeout) {
- continueWait = false;
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Waiting to obtain data from pull based adaptor");
- }
- Thread.sleep(1000);
- waitCount++;
- }
- break;
- case NO_MORE_DATA:
- break;
- }
- }
- return state;
- } catch (Exception e) {
- throw new AsterixException(e);
- }
-
- }
-
- private void writeRecord(AMutableRecord record, DataOutput dataOutput, IARecordBuilder recordBuilder)
- throws IOException, AsterixException {
- ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
- int numFields = record.getType().getFieldNames().length;
- for (int pos = 0; pos < numFields; pos++) {
- fieldValue.reset();
- IAObject obj = record.getValueByPos(pos);
- writeObject(obj, fieldValue.getDataOutput());
- recordBuilder.addField(pos, fieldValue);
- }
- recordBuilder.write(dataOutput, true);
- }
-
- private void writeObject(IAObject obj, DataOutput dataOutput) throws IOException, AsterixException {
- switch (obj.getType().getTypeTag()) {
- case RECORD: {
- IARecordBuilder recordBuilder = new RecordBuilder();
- recordBuilder.reset((ARecordType) obj.getType());
- recordBuilder.init();
- writeRecord((AMutableRecord) obj, dataOutput, recordBuilder);
- break;
- }
-
- case ORDEREDLIST: {
- OrderedListBuilder listBuilder = new OrderedListBuilder();
- listBuilder.reset((AOrderedListType) ((AMutableOrderedList) obj).getType());
- IACursor cursor = ((AMutableOrderedList) obj).getCursor();
- ArrayBackedValueStorage listItemValue = new ArrayBackedValueStorage();
- while (cursor.next()) {
- listItemValue.reset();
- IAObject item = cursor.get();
- writeObject(item, listItemValue.getDataOutput());
- listBuilder.addItem(listItemValue);
- }
- listBuilder.write(dataOutput, true);
- break;
- }
-
- case UNORDEREDLIST: {
- UnorderedListBuilder listBuilder = new UnorderedListBuilder();
- listBuilder.reset((AUnorderedListType) ((AMutableUnorderedList) obj).getType());
- IACursor cursor = ((AMutableUnorderedList) obj).getCursor();
- ArrayBackedValueStorage listItemValue = new ArrayBackedValueStorage();
- while (cursor.next()) {
- listItemValue.reset();
- IAObject item = cursor.get();
- writeObject(item, listItemValue.getDataOutput());
- listBuilder.addItem(listItemValue);
- }
- listBuilder.write(dataOutput, true);
- break;
- }
-
- default:
- AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(obj.getType()).serialize(obj,
- dataOutput);
- break;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
deleted file mode 100644
index ff9af0c..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public abstract class FileSystemBasedAdapter implements IDatasourceAdapter {
-
- private static final long serialVersionUID = 1L;
-
- public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
-
- public abstract InputStream getInputStream(int partition) throws IOException;
-
- protected final ITupleParserFactory parserFactory;
- protected ITupleParser tupleParser;
- protected final IAType sourceDatatype;
- protected IHyracksTaskContext ctx;
-
- public FileSystemBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx)
- throws HyracksDataException {
- this.parserFactory = parserFactory;
- this.sourceDatatype = sourceDatatype;
- this.ctx = ctx;
- }
-
- @Override
- public void start(int partition, IFrameWriter writer) throws Exception {
- tupleParser = parserFactory.createTupleParser(ctx);
- InputStream in = getInputStream(partition);
- tupleParser.parse(in, writer);
- }
-
- public String getFilename(int partition) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
new file mode 100644
index 0000000..74e98dd
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+public class GenericAdapter implements IDataSourceAdapter {
+
+ private static final long serialVersionUID = 1L;
+ private final IDataFlowController controller;
+
+ public GenericAdapter(IDataFlowController controller) {
+ this.controller = controller;
+ }
+
+ @Override
+ public void start(int partition, IFrameWriter writer) throws Exception {
+ controller.start(writer);
+ }
+
+ @Override
+ public boolean stop() throws Exception {
+ return controller.stop();
+ }
+
+ @Override
+ public boolean handleException(Throwable e) {
+ return controller.handleException(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
deleted file mode 100644
index 5f1b1ae..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.input.GenericFileAwareRecordReader;
-import org.apache.asterix.external.indexing.input.GenericRecordReader;
-import org.apache.asterix.external.indexing.input.TextualDataReader;
-import org.apache.asterix.external.indexing.input.TextualFullScanDataReader;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * Provides functionality for fetching external data stored in an HDFS instance.
- */
-
-public class HDFSAdapter extends FileSystemBasedAdapter {
-
- private static final long serialVersionUID = 1L;
-
- private transient String[] readSchedule;
- private transient boolean executed[];
- private transient InputSplit[] inputSplits;
- private transient JobConf conf;
- private transient String nodeName;
- private transient List<ExternalFile> files;
- private transient Map<String, String> configuration;
-
- public HDFSAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
- String nodeName, ITupleParserFactory parserFactory, IHyracksTaskContext ctx,
- Map<String, String> configuration, List<ExternalFile> files) throws HyracksDataException {
- super(parserFactory, atype, ctx);
- this.readSchedule = readSchedule;
- this.executed = executed;
- this.inputSplits = inputSplits;
- this.conf = conf;
- this.nodeName = nodeName;
- this.files = files;
- this.configuration = configuration;
- }
-
- /*
- * The method below was modified to take care of the following
- * 1. when target files are not null, it generates a file aware input stream that validate against the files
- * 2. if the data is binary, it returns a generic reader
- */
- @Override
- public InputStream getInputStream(int partition) throws IOException {
- if ((conf.getInputFormat() instanceof TextInputFormat
- || conf.getInputFormat() instanceof SequenceFileInputFormat)
- && (AsterixTupleParserFactory.FORMAT_ADM
- .equalsIgnoreCase(configuration.get(AsterixTupleParserFactory.KEY_FORMAT))
- || AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT
- .equalsIgnoreCase(configuration.get(AsterixTupleParserFactory.KEY_FORMAT)))) {
- if (files != null) {
- return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
- } else {
- return new TextualFullScanDataReader(executed, inputSplits, readSchedule, nodeName, conf);
- }
- } else {
- if (files != null) {
- return new GenericFileAwareRecordReader(inputSplits, readSchedule, nodeName, conf, executed, files);
- } else {
- return new GenericRecordReader(inputSplits, readSchedule, nodeName, conf, executed);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
deleted file mode 100644
index 92a049d0..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.input.GenericFileAwareRecordReader;
-import org.apache.asterix.external.indexing.input.RCFileDataReader;
-import org.apache.asterix.external.indexing.input.TextualDataReader;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public class HDFSIndexingAdapter extends FileSystemBasedAdapter {
-
- private static final long serialVersionUID = 1L;
- private transient String[] readSchedule;
- private transient boolean executed[];
- private transient InputSplit[] inputSplits;
- private transient JobConf conf;
- private final List<ExternalFile> files;
- private transient String nodeName;
- // file input-format <text, seq, rc>
- private String inputFormat;
- // content format <adm, delimited-text, binary>
- private String format;
-
- public HDFSIndexingAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits,
- JobConf conf, AlgebricksPartitionConstraint clusterLocations, List<ExternalFile> files,
- ITupleParserFactory parserFactory, IHyracksTaskContext ctx, String nodeName, String inputFormat,
- String format) throws IOException {
- super(parserFactory, atype, ctx);
- this.nodeName = nodeName;
- this.readSchedule = readSchedule;
- this.executed = executed;
- this.inputSplits = inputSplits;
- this.conf = conf;
- this.files = files;
- this.inputFormat = inputFormat;
- this.format = format;
- }
-
- @Override
- public InputStream getInputStream(int partition) throws IOException {
- if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
- return new RCFileDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
- } else if (format.equals(AsterixTupleParserFactory.FORMAT_ADM)
- || format.equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
- return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
- } else {
- return new GenericFileAwareRecordReader(inputSplits, readSchedule, nodeName, conf, executed, files);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HiveAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HiveAdapter.java
deleted file mode 100644
index 1b8024d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HiveAdapter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * Provides the functionality of fetching data in form of ADM records from a Hive dataset.
- */
-public class HiveAdapter extends FileSystemBasedAdapter {
-
- private static final long serialVersionUID = 1L;
-
- private HDFSAdapter hdfsAdapter;
-
- public HiveAdapter(IAType atype, HDFSAdapter hdfsAdapter, ITupleParserFactory parserFactory, IHyracksTaskContext ctx)
- throws HyracksDataException {
- super(parserFactory, atype, ctx);
- this.hdfsAdapter = hdfsAdapter;
- }
-
- @Override
- public InputStream getInputStream(int partition) throws IOException {
- return hdfsAdapter.getInputStream(partition);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IControlledAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IControlledAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IControlledAdapter.java
deleted file mode 100644
index e71f10c..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IControlledAdapter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- *
- * @author alamouda
- *
- */
-public interface IControlledAdapter extends Serializable {
-
- /**
- *
- * @param ctx
- * @param recordDescriptors
- * @throws Exception
- */
- public void initialize(IHyracksTaskContext ctx, INullWriterFactory iNullWriterFactory) throws Exception;
-
- /**
- *
- * @param buffer
- * @param writer
- * @throws HyracksDataException
- */
- public void nextFrame(ByteBuffer buffer, IFrameWriter writer) throws Exception;
-
- /**
- *
- * @param writer
- * @throws HyracksDataException
- */
- public void close(IFrameWriter writer) throws Exception;
-
- /**
- * Gives the adapter a chance to clean up
- * @param writer
- * @throws HyracksDataException
- */
- public void fail() throws Exception;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClient.java
deleted file mode 100644
index 6377960..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClient.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.DataOutput;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-
-public interface IFeedClient {
-
- public enum InflowState {
- NO_MORE_DATA,
- DATA_AVAILABLE,
- DATA_NOT_AVAILABLE
- }
-
- /**
- * Writes the next fetched tuple into the provided instance of DatatOutput. Invocation of this method blocks until
- * a new tuple has been written or the specified time has expired.
- *
- * @param dataOutput
- * The receiving channel for the feed client to write ADM records to.
- * @param timeout
- * Threshold time (expressed in seconds) for the next tuple to be obtained from the external source.
- * @return
- * @throws AsterixException
- */
- public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClientFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClientFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClientFactory.java
deleted file mode 100644
index b175518..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClientFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public interface IFeedClientFactory {
-
- public IPullBasedFeedClient createFeedClient(IHyracksTaskContext ctx, Map<String, String> configuration)
- throws Exception;
-
- public ARecordType getRecordType() throws AsterixException;
-
- public FeedClientType getFeedClientType();
-
- public enum FeedClientType {
- GENERIC,
- TYPED
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IPullBasedFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IPullBasedFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IPullBasedFeedClient.java
deleted file mode 100644
index f6c9dad..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IPullBasedFeedClient.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.DataOutput;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-
-public interface IPullBasedFeedClient {
-
- public enum InflowState {
- NO_MORE_DATA,
- DATA_AVAILABLE,
- DATA_NOT_AVAILABLE
- }
-
- /**
- * Writes the next fetched tuple into the provided instance of DatatOutput. Invocation of this method blocks until
- * a new tuple has been written or the specified time has expired.
- *
- * @param dataOutput
- * The receiving channel for the feed client to write ADM records to.
- * @param timeout
- * Threshold time (expressed in seconds) for the next tuple to be obtained from the externa source.
- * @return
- * @throws AsterixException
- */
- public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
new file mode 100644
index 0000000..ba6f83c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.asterix.external.indexing.RecordIdReader;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INullWriter;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public final class LookupAdapter<T> implements IFrameWriter {
+
+ private boolean propagateInput;
+ private int[] propagatedFields;
+ private boolean retainNull;
+ private ArrayTupleBuilder tb;
+ private FrameTupleAppender appender;
+ private IRecordDataParser<T> dataParser;
+ private ILookupRecordReader<? extends T> recordReader;
+ private RecordIdReader ridReader;
+ private FrameTupleAccessor tupleAccessor;
+ private IFrameWriter writer;
+ private FrameTupleReference frameTuple;
+ private ArrayTupleBuilder nullTupleBuild;
+
+ public LookupAdapter(IRecordDataParser<T> dataParser, ILookupRecordReader<? extends T> recordReader,
+ RecordDescriptor inRecDesc, RecordIdReader ridReader, boolean propagateInput, int[] propagatedFields,
+ boolean retainNull, INullWriterFactory iNullWriterFactory, IHyracksTaskContext ctx, IFrameWriter writer)
+ throws HyracksDataException {
+ this.dataParser = dataParser;
+ this.recordReader = recordReader;
+ this.propagateInput = propagateInput;
+ this.propagatedFields = propagatedFields;
+ this.retainNull = retainNull;
+ this.tupleAccessor = new FrameTupleAccessor(inRecDesc);
+ this.ridReader = ridReader;
+ ridReader.set(tupleAccessor, inRecDesc);
+ configurePropagation(iNullWriterFactory);
+ appender = new FrameTupleAppender(new VSizeFrame(ctx));
+ this.writer = writer;
+ }
+
+ private void configurePropagation(INullWriterFactory iNullWriterFactory) {
+ if (propagateInput) {
+ tb = new ArrayTupleBuilder(propagatedFields.length + 1);
+ frameTuple = new FrameTupleReference();
+ } else {
+ tb = new ArrayTupleBuilder(1);
+ }
+ if (retainNull) {
+ INullWriter nullWriter = iNullWriterFactory.createNullWriter();
+ nullTupleBuild = new ArrayTupleBuilder(1);
+ DataOutput out = nullTupleBuild.getDataOutput();
+ try {
+ nullWriter.writeNull(out);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ } else {
+ nullTupleBuild = null;
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ try {
+ recordReader.fail();
+ } catch (Throwable th) {
+ throw new HyracksDataException(th);
+ } finally {
+ writer.fail();
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ tupleAccessor.reset(buffer);
+ int tupleIndex = 0;
+ int tupleCount = tupleAccessor.getTupleCount();
+ while (tupleIndex < tupleCount) {
+ IRawRecord<? extends T> record = null;
+ RecordId rid = ridReader.read(tupleIndex);
+ if (rid != null) {
+ record = recordReader.read(rid);
+ }
+ tb.reset();
+ if (propagateInput) {
+ propagate(tupleIndex);
+ }
+ if (record != null) {
+ dataParser.parse(record, tb.getDataOutput());
+ tb.addFieldEndOffset();
+ DataflowUtils.addTupleToFrame(appender, tb, writer);
+ } else if (retainNull) {
+ tb.getDataOutput().write(nullTupleBuild.getByteArray());
+ tb.addFieldEndOffset();
+ DataflowUtils.addTupleToFrame(appender, tb, writer);
+ }
+ tupleIndex++;
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void propagate(int idx) throws IOException {
+ frameTuple.reset(tupleAccessor, idx);
+ for (int i = 0; i < propagatedFields.length; i++) {
+ tb.getDataOutput().write(frameTuple.getFieldData(propagatedFields[i]),
+ frameTuple.getFieldStart(propagatedFields[i]), frameTuple.getFieldLength(propagatedFields[i]));
+ tb.addFieldEndOffset();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ appender.flush(writer, true);
+ } finally {
+ writer.close();
+ }
+ }
+}