You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:10 UTC
[12/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
new file mode 100644
index 0000000..9864805
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+
+public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
+
+ private int depth;
+ private boolean prevCharEscape;
+ private boolean inString;
+ private char recordStart;
+ private char recordEnd;
+ private int recordNumber = 0;
+
+ public int getRecordNumber() {
+ return recordNumber;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ super.configure(configuration);
+ String recStartString = configuration.get(ExternalDataConstants.KEY_RECORD_START);
+ String recEndString = configuration.get(ExternalDataConstants.KEY_RECORD_END);
+ if (recStartString != null) {
+ if (recStartString.length() != 1) {
+ throw new AsterixException(
+ ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
+ ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
+ }
+ recordStart = recStartString.charAt(0);
+ } else {
+ recordStart = ExternalDataConstants.DEFAULT_RECORD_START;
+ }
+ if (recEndString != null) {
+ if (recEndString.length() != 1) {
+ throw new AsterixException(
+ ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
+ ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
+ }
+ recordEnd = recEndString.charAt(0);
+ } else {
+ recordEnd = ExternalDataConstants.DEFAULT_RECORD_END;
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ record.reset();
+ boolean hasStarted = false;
+ boolean hasFinished = false;
+ prevCharEscape = false;
+ inString = false;
+ depth = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = reader.read(inputBuffer);
+ if (bufferLength <= 0) {
+ return false; // EOF
+ }
+ }
+ if (!hasStarted) {
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+ if (inputBuffer[bufferPosn] == recordStart) {
+ startPosn = bufferPosn;
+ hasStarted = true;
+ depth = 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ } else if (inputBuffer[bufferPosn] != ExternalDataConstants.SPACE
+ && inputBuffer[bufferPosn] != ExternalDataConstants.TAB
+ && inputBuffer[bufferPosn] != ExternalDataConstants.LF
+ && inputBuffer[bufferPosn] != ExternalDataConstants.CR) {
+ // corrupted file. clear the buffer and stop reading
+ reader.skipError();
+ bufferPosn = bufferLength = 0;
+ throw new IOException("Malformed input stream");
+ }
+ }
+ }
+ if (hasStarted) {
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+ if (inString) {
+ // we are in a string, we only care about the string end
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
+ inString = false;
+ }
+ if (prevCharEscape) {
+ prevCharEscape = false;
+ } else {
+ prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ }
+ } else {
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE) {
+ inString = true;
+ } else if (inputBuffer[bufferPosn] == recordStart) {
+ depth += 1;
+ } else if (inputBuffer[bufferPosn] == recordEnd) {
+ depth -= 1;
+ if (depth == 0) {
+ hasFinished = true;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ int appendLength = bufferPosn - startPosn;
+ if (appendLength > 0) {
+ record.append(inputBuffer, startPosn, appendLength);
+ }
+ } while (!hasFinished);
+ record.endRecord();
+ recordNumber++;
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
new file mode 100644
index 0000000..c294ccb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+
+public class SequenceLookupReader extends AbstractCharRecordLookupReader {
+
+ public SequenceLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+ super(snapshotAccessor, fs, conf);
+ }
+
+ private static final Logger LOGGER = Logger.getLogger(SequenceLookupReader.class.getName());
+ private Reader reader;
+ private Writable key;
+
+ @Override
+ protected void readRecord(RecordId rid) throws IOException {
+ reader.seek(rid.getOffset());
+ reader.next(key, value);
+ }
+
+ @Override
+ protected void closeFile() {
+ if (reader == null) {
+ return;
+ }
+ try {
+ reader.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing HDFS file ", e);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected void openFile() throws IllegalArgumentException, IOException {
+ reader = new SequenceFile.Reader(fs, new Path(file.getFileName()), conf);
+ key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
new file mode 100644
index 0000000..b276bfa
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class TextLookupReader extends AbstractCharRecordLookupReader {
+
+ public TextLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+ super(snapshotAccessor, fs, conf);
+ }
+
+ private static final Logger LOGGER = Logger.getLogger(TextLookupReader.class.getName());
+ private HDFSTextLineReader reader;
+
+ @Override
+ protected void readRecord(RecordId rid) throws IOException {
+ reader.seek(rid.getOffset());
+ reader.readLine(value);
+ }
+
+ @Override
+ protected void closeFile() {
+ if (reader == null) {
+ return;
+ }
+ try {
+ reader.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing HDFS file ", e);
+ }
+ }
+
+ @Override
+ protected void openFile() throws IllegalArgumentException, IOException {
+ if (reader == null) {
+ reader = new HDFSTextLineReader();
+ }
+ reader.resetReader(fs.open(new Path(file.getFileName())));;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
new file mode 100644
index 0000000..34d8122
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.TwitterUtil;
+import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import twitter4j.Query;
+import twitter4j.QueryResult;
+import twitter4j.Status;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+
+public class TwitterPullRecordReader implements IRecordReader<Status> {
+
+ private String keywords;
+ private Query query;
+ private Twitter twitter;
+ private int requestInterval = 5; // seconds
+ private QueryResult result;
+ private int nextTweetIndex = 0;
+ private long lastTweetIdReceived = 0;
+ private GenericRecord<Status> record;
+
+ public TwitterPullRecordReader() {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ twitter = TwitterUtil.getTwitterService(configuration);
+ keywords = configuration.get(SearchAPIConstants.QUERY);
+ requestInterval = Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL));
+ query = new Query(keywords);
+ query.setCount(100);
+ record = new GenericRecord<Status>();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return true;
+ }
+
+ @Override
+ public IRawRecord<Status> next() throws IOException, InterruptedException {
+ if (result == null || nextTweetIndex >= result.getTweets().size()) {
+ Thread.sleep(1000 * requestInterval);
+ query.setSinceId(lastTweetIdReceived);
+ try {
+ result = twitter.search(query);
+ } catch (TwitterException e) {
+ throw new HyracksDataException(e);
+ }
+ nextTweetIndex = 0;
+ }
+ if (result != null && !result.getTweets().isEmpty()) {
+ List<Status> tw = result.getTweets();
+ Status tweet = tw.get(nextTweetIndex++);
+ if (lastTweetIdReceived < tweet.getId()) {
+ lastTweetIdReceived = tweet.getId();
+ }
+ record.set(tweet);
+ return record;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Class<Status> getRecordClass() throws IOException {
+ return Status.class;
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
new file mode 100644
index 0000000..e7c141d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.TwitterUtil;
+
+import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+
+public class TwitterPushRecordReader implements IRecordReader<Status> {
+ private LinkedBlockingQueue<Status> inputQ;
+ private TwitterStream twitterStream;
+ private GenericRecord<Status> record;
+
+ @Override
+ public void close() throws IOException {
+ twitterStream.clearListeners();
+ twitterStream.cleanUp();
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ record = new GenericRecord<Status>();
+ inputQ = new LinkedBlockingQueue<Status>();
+ twitterStream = TwitterUtil.getTwitterStream(configuration);
+ twitterStream.addListener(new TweetListener(inputQ));
+ FilterQuery query = TwitterUtil.getFilterQuery(configuration);
+ if (query != null) {
+ twitterStream.filter(query);
+ } else {
+ twitterStream.sample();
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return true;
+ }
+
+ @Override
+ public IRawRecord<Status> next() throws IOException, InterruptedException {
+ Status tweet = inputQ.poll();
+ if (tweet == null) {
+ return null;
+ }
+ record.set(tweet);
+ return record;
+ }
+
+ @Override
+ public Class<? extends Status> getRecordClass() throws IOException {
+ return Status.class;
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ private class TweetListener implements StatusListener {
+
+ private LinkedBlockingQueue<Status> inputQ;
+
+ public TweetListener(LinkedBlockingQueue<Status> inputQ) {
+ this.inputQ = inputQ;
+ }
+
+ @Override
+ public void onStatus(Status tweet) {
+ inputQ.add(tweet);
+ }
+
+ @Override
+ public void onException(Exception arg0) {
+
+ }
+
+ @Override
+ public void onDeletionNotice(StatusDeletionNotice arg0) {
+ }
+
+ @Override
+ public void onScrubGeo(long arg0, long arg1) {
+ }
+
+ @Override
+ public void onStallWarning(StallWarning arg0) {
+ }
+
+ @Override
+ public void onTrackLimitationNotice(int arg0) {
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
new file mode 100644
index 0000000..e9fad25
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.ILookupReaderFactory;
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.input.record.reader.RCLookupReader;
+import org.apache.asterix.external.input.record.reader.SequenceLookupReader;
+import org.apache.asterix.external.input.record.reader.TextLookupReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+
+public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
+
+ protected static final long serialVersionUID = 1L;
+ protected transient AlgebricksPartitionConstraint clusterLocations;
+ protected ConfFactory confFactory;
+ protected Map<String, String> configuration;
+
+ public HDFSLookupReaderFactory() {
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
+ return clusterLocations;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
+ confFactory = new ConfFactory(conf);
+
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ILookupRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition,
+ ExternalFileIndexAccessor snapshotAccessor) throws Exception {
+ String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+ JobConf conf = confFactory.getConf();
+ FileSystem fs = FileSystem.get(conf);
+ switch (inputFormatParameter) {
+ case ExternalDataConstants.INPUT_FORMAT_TEXT:
+ return (ILookupRecordReader<? extends T>) new TextLookupReader(snapshotAccessor, fs, conf);
+ case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
+ return (ILookupRecordReader<? extends T>) new SequenceLookupReader(snapshotAccessor, fs, conf);
+ case ExternalDataConstants.INPUT_FORMAT_RC:
+ return (ILookupRecordReader<? extends T>) new RCLookupReader(snapshotAccessor, fs, conf);
+ default:
+ throw new AsterixException("Unrecognised input format: " + inputFormatParameter);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
new file mode 100644
index 0000000..05d419d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.LineRecordReader;
+import org.apache.asterix.external.input.record.reader.QuotedLineRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+ String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
+ LineRecordReader recordReader;
+ if (quoteString != null) {
+ recordReader = new QuotedLineRecordReader();
+ } else {
+ recordReader = new LineRecordReader();
+ }
+ return configureReader(recordReader, ctx, partition);
+ }
+
+ @Override
+ public Class<? extends char[]> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
new file mode 100644
index 0000000..a672f2f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.factory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.RSSRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+
+public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntryImpl> {
+
+ private static final long serialVersionUID = 1L;
+ private Map<String, String> configuration;
+ private List<String> urls = new ArrayList<String>();
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return new AlgebricksCountPartitionConstraint(urls.size());
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ String url = configuration.get(ExternalDataConstants.KEY_RSS_URL);
+ if (url == null) {
+ throw new IllegalArgumentException("no RSS URL provided");
+ }
+ initializeURLs(url);
+ }
+
+ private void initializeURLs(String url) {
+ urls.clear();
+ String[] rssURLs = url.split(",");
+ for (String rssURL : rssURLs) {
+ urls.add(rssURL);
+ }
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public IRecordReader<? extends SyndEntryImpl> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws Exception {
+ RSSRecordReader reader = new RSSRecordReader(urls.get(partition));
+ reader.configure(configuration);
+ return reader;
+ }
+
+ @Override
+ public Class<? extends SyndEntryImpl> getRecordClass() {
+ return SyndEntryImpl.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
new file mode 100644
index 0000000..91b439c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.SemiStructuredRecordReader;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+ SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
+ return configureReader(recordReader, ctx, partition);
+ }
+
+ @Override
+ public Class<? extends char[]> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
new file mode 100644
index 0000000..72aaa37
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.factory;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.TwitterPullRecordReader;
+import org.apache.asterix.external.input.record.reader.TwitterPushRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.TwitterUtil;
+import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
+import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import twitter4j.Status;
+
+public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName());
+
+ private static final String DEFAULT_INTERVAL = "10"; // 10 seconds
+ private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
+
+ private Map<String, String> configuration;
+ private boolean pull;
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY);
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+ if (!validateConfiguration(configuration)) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("One or more parameters are missing from adapter configuration\n");
+ builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
+ builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
+ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
+ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
+ throw new Exception(builder.toString());
+ }
+ if (ExternalDataUtils.isPull(configuration)) {
+ pull = true;
+ if (configuration.get(SearchAPIConstants.QUERY) == null) {
+ throw new AsterixException(
+ "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
+ }
+ String interval = configuration.get(SearchAPIConstants.INTERVAL);
+ if (interval != null) {
+ try {
+ Integer.parseInt(interval);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException(
+ "parameter " + SearchAPIConstants.INTERVAL + " is defined incorrectly, expecting a number");
+ }
+ } else {
+ configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
+ + DEFAULT_INTERVAL + ")");
+ }
+ }
+ } else if (ExternalDataUtils.isPush(configuration)) {
+ pull = false;
+ } else {
+ throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
+ + ExternalDataConstants.KEY_PUSH + "must be specified as part of adaptor configuration");
+ }
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+ IRecordReader<Status> reader;
+ if (pull) {
+ reader = new TwitterPullRecordReader();
+ } else {
+ reader = new TwitterPushRecordReader();
+ }
+ reader.configure(configuration);
+ return reader;
+ }
+
+ @Override
+ public Class<? extends Status> getRecordClass() {
+ return Status.class;
+ }
+
+ private boolean validateConfiguration(Map<String, String> configuration) {
+ String consumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+ String consumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+ String accessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+ String tokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+ if (consumerKey == null || consumerSecret == null || accessToken == null || tokenSecret == null) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
new file mode 100644
index 0000000..73f6195
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.InputStream;
+
+public abstract class AInputStream extends InputStream {
+ public abstract boolean skipError() throws Exception;
+
+ public abstract boolean stop() throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
new file mode 100644
index 0000000..e573f74
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.InputStreamReader;
+
+public class AInputStreamReader extends InputStreamReader {
+ private AInputStream in;
+
+ public AInputStreamReader(AInputStream in) {
+ super(in);
+ this.in = in;
+ }
+
+ public boolean skipError() throws Exception {
+ return in.skipError();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
new file mode 100644
index 0000000..aa7a3d8
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class BasicInputStream extends AInputStream {
+ private final InputStream in;
+
+ public BasicInputStream(InputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ @Override
+ public int read(byte b[]) throws IOException {
+ return in.read(b);
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return in.skip(n);
+
+ }
+
+ @Override
+ public int available() throws IOException {
+ return in.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ in.mark(readlimit);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ in.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ @Override
+ public boolean skipError() {
+ return false;
+ }
+
+ @Override
+ public boolean stop() throws Exception {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
new file mode 100644
index 0000000..b3ad1c3
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.provider.ExternalIndexerProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implements IInputStreamProvider {
+
+ public HDFSInputStreamProvider(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
+ JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot) throws Exception {
+ super(read, inputSplits, readSchedule, nodeName, conf);
+ value = new Text();
+ configure(configuration);
+ if (snapshot != null) {
+ setSnapshot(snapshot);
+ setIndexer(ExternalIndexerProvider.getIndexer(configuration));
+ if (currentSplitIndex < snapshot.size()) {
+ indexer.reset(this);
+ }
+ }
+ }
+
+ @Override
+ public AInputStream getInputStream() throws Exception {
+ return new HDFSInputStream();
+ }
+
+ private class HDFSInputStream extends AInputStream {
+ int pos = 0;
+
+ @Override
+ public int read() throws IOException {
+ if (value.getLength() < pos) {
+ if (!readMore()) {
+ return -1;
+ }
+ } else if (value.getLength() == pos) {
+ pos++;
+ return ExternalDataConstants.EOL;
+ }
+ return value.getBytes()[pos++];
+ }
+
+ private int readRecord(byte[] buffer, int offset, int len) {
+ int actualLength = value.getLength() + 1;
+ if ((actualLength - pos) > len) {
+ //copy partial record
+ System.arraycopy(value.getBytes(), pos, buffer, offset, len);
+ pos += len;
+ return len;
+ } else {
+ int numBytes = value.getLength() - pos;
+ System.arraycopy(value.getBytes(), pos, buffer, offset, numBytes);
+ buffer[offset + numBytes] = ExternalDataConstants.LF;
+ pos += numBytes;
+ numBytes++;
+ return numBytes;
+ }
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (value.getLength() > pos) {
+ return readRecord(buffer, offset, len);
+ }
+ if (!readMore()) {
+ return -1;
+ }
+ return readRecord(buffer, offset, len);
+ }
+
+ private boolean readMore() throws IOException {
+ try {
+ pos = 0;
+ return HDFSInputStreamProvider.this.hasNext();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean skipError() throws Exception {
+ return true;
+ }
+
+ @Override
+ public boolean stop() throws Exception {
+ return false;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
new file mode 100644
index 0000000..b511617
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class LocalFSInputStreamProvider implements IInputStreamProvider {
+
+ private FileSplit[] fileSplits;
+ private int partition;
+
+ public LocalFSInputStreamProvider(FileSplit[] fileSplits, IHyracksTaskContext ctx,
+ Map<String, String> configuration, int partition) {
+ this.partition = partition;
+ this.fileSplits = fileSplits;
+ }
+
+ @Override
+ public AInputStream getInputStream() throws Exception {
+ FileSplit split = fileSplits[partition];
+ File inputFile = split.getLocalFile().getFile();
+ InputStream in;
+ try {
+ in = new FileInputStream(inputFile);
+ return new BasicInputStream(in);
+ } catch (FileNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
new file mode 100644
index 0000000..2253a73
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class SocketInputStream extends AInputStream {
+ private ServerSocket server;
+ private Socket socket;
+ private InputStream connectionStream;
+
+ public SocketInputStream(ServerSocket server) throws IOException {
+ this.server = server;
+ socket = server.accept();
+ connectionStream = socket.getInputStream();
+ }
+
+ @Override
+ public int read() throws IOException {
+ int read = connectionStream.read();
+ while (read < 0) {
+ accept();
+ read = connectionStream.read();
+ }
+ return read;
+ }
+
+ @Override
+ public boolean skipError() throws Exception {
+ accept();
+ return true;
+ }
+
+ @Override
+ public int read(byte b[]) throws IOException {
+ int read = connectionStream.read(b, 0, b.length);
+ while (read < 0) {
+ accept();
+ read = connectionStream.read(b, 0, b.length);
+ }
+ return read;
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ int read = connectionStream.read(b, off, len);
+ while (read < 0) {
+ accept();
+ read = connectionStream.read(b, off, len);
+ }
+ return read;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return 1;
+ }
+
+ @Override
+ public void close() throws IOException {
+ connectionStream.close();
+ socket.close();
+ server.close();
+ }
+
+ private void accept() throws IOException {
+ connectionStream.close();
+ socket.close();
+ socket = server.accept();
+ connectionStream = socket.getInputStream();
+ }
+
+ @Override
+ public boolean stop() throws Exception {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
new file mode 100644
index 0000000..1f920e9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.net.ServerSocket;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+
+public class SocketInputStreamProvider implements IInputStreamProvider {
+ private ServerSocket server;
+
+ public SocketInputStreamProvider(ServerSocket server) {
+ this.server = server;
+ }
+
+ @Override
+ public AInputStream getInputStream() throws Exception {
+ return new SocketInputStream(server);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
new file mode 100644
index 0000000..d32a94f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.runtime.TweetGenerator;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {
+
+ private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStreamProvider.class.getName());
+
+ private ExecutorService executorService;
+
+ private PipedOutputStream outputStream;
+
+ private PipedInputStream inputStream;
+
+ private TwitterServer twitterServer;
+
+ public TwitterFirehoseInputStreamProvider(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
+ throws Exception {
+ executorService = Executors.newCachedThreadPool();
+ outputStream = new PipedOutputStream();
+ inputStream = new PipedInputStream(outputStream);
+ twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
+ }
+
+ @Override
+ public AInputStream getInputStream() throws Exception {
+ twitterServer.start();
+ return twitterServer;
+ }
+
+ private static class TwitterServer extends AInputStream {
+ private final DataProvider dataProvider;
+ private final ExecutorService executorService;
+ private InputStream in;
+ private boolean started;
+
+ public TwitterServer(Map<String, String> configuration, int partition, OutputStream os,
+ ExecutorService executorService, InputStream in) throws Exception {
+ dataProvider = new DataProvider(configuration, partition, os);
+ this.executorService = executorService;
+ this.in = in;
+ this.started = false;
+ }
+
+ @Override
+ public boolean stop() throws IOException {
+ dataProvider.stop();
+ return true;
+ }
+
+ public void start() {
+ executorService.execute(dataProvider);
+ }
+
+ @Override
+ public boolean skipError() throws Exception {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!started) {
+ start();
+ started = true;
+ }
+ return in.read();
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ if (!started) {
+ start();
+ started = true;
+ }
+ return in.read(b, off, len);
+ }
+ }
+
+ private static class DataProvider implements Runnable {
+
+ public static final String KEY_MODE = "mode";
+
+ private TweetGenerator tweetGenerator;
+ private boolean continuePush = true;
+ private int batchSize;
+ private final Mode mode;
+ private final OutputStream os;
+
+ public static enum Mode {
+ AGGRESSIVE,
+ CONTROLLED
+ }
+
+ public DataProvider(Map<String, String> configuration, int partition, OutputStream os) throws Exception {
+ this.tweetGenerator = new TweetGenerator(configuration, partition);
+ this.tweetGenerator.registerSubscriber(os);
+ this.os = os;
+ mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
+ : Mode.AGGRESSIVE;
+ switch (mode) {
+ case CONTROLLED:
+ String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
+ if (tpsValue == null) {
+ throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
+ }
+ batchSize = Integer.parseInt(tpsValue);
+ break;
+ case AGGRESSIVE:
+ batchSize = 5000;
+ break;
+ }
+ }
+
+ @Override
+ public void run() {
+ boolean moreData = true;
+ long startBatch;
+ long endBatch;
+
+ while (true) {
+ try {
+ while (moreData && continuePush) {
+ switch (mode) {
+ case AGGRESSIVE:
+ moreData = tweetGenerator.generateNextBatch(batchSize);
+ break;
+ case CONTROLLED:
+ startBatch = System.currentTimeMillis();
+ moreData = tweetGenerator.generateNextBatch(batchSize);
+ endBatch = System.currentTimeMillis();
+ if (endBatch - startBatch < 1000) {
+ Thread.sleep(1000 - (endBatch - startBatch));
+ }
+ break;
+ }
+ }
+ os.close();
+ break;
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in adaptor " + e.getMessage());
+ }
+ }
+ }
+ }
+
+ public void stop() {
+ continuePush = false;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
new file mode 100644
index 0000000..14c712a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.File;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.external.api.INodeResolverFactory;
+import org.apache.asterix.external.input.stream.LocalFSInputStreamProvider;
+import org.apache.asterix.external.util.DNSResolverFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ protected static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver();
+ protected static final Logger LOGGER = Logger.getLogger(LocalFSInputStreamProviderFactory.class.getName());
+ protected static INodeResolver nodeResolver;
+ protected Map<String, String> configuration;
+ protected FileSplit[] fileSplits;
+
+ @Override
+ public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception {
+ return new LocalFSInputStreamProvider(fileSplits, ctx, configuration, partition);
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.STREAM;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
+ configureFileSplits(splits);
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return configurePartitionConstraint();
+ }
+
+ private void configureFileSplits(String[] splits) throws AsterixException {
+ if (fileSplits == null) {
+ fileSplits = new FileSplit[splits.length];
+ String nodeName;
+ String nodeLocalPath;
+ int count = 0;
+ String trimmedValue;
+ for (String splitPath : splits) {
+ trimmedValue = splitPath.trim();
+ if (!trimmedValue.contains("://")) {
+ throw new AsterixException(
+ "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
+ }
+ nodeName = trimmedValue.split(":")[0];
+ nodeLocalPath = trimmedValue.split("://")[1];
+ FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
+ fileSplits[count++] = fileSplit;
+ }
+ }
+ }
+
+ private AlgebricksPartitionConstraint configurePartitionConstraint() throws AsterixException {
+ String[] locs = new String[fileSplits.length];
+ String location;
+ for (int i = 0; i < fileSplits.length; i++) {
+ location = getNodeResolver().resolveNode(fileSplits[i].getNodeName());
+ locs[i] = location;
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locs);
+ }
+
+ protected INodeResolver getNodeResolver() {
+ if (nodeResolver == null) {
+ synchronized (DEFAULT_NODE_RESOLVER) {
+ if (nodeResolver == null) {
+ nodeResolver = initializeNodeResolver();
+ }
+ }
+ }
+ return nodeResolver;
+ }
+
+ private static INodeResolver initializeNodeResolver() {
+ INodeResolver nodeResolver = null;
+ String configuredNodeResolverFactory = System.getProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY);
+ if (configuredNodeResolverFactory != null) {
+ try {
+ nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
+ .createNodeResolver();
+
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname "
+ + configuredNodeResolverFactory + "\n" + e.getMessage());
+ }
+ nodeResolver = DEFAULT_NODE_RESOLVER;
+ }
+ } else {
+ nodeResolver = DEFAULT_NODE_RESOLVER;
+ }
+ return nodeResolver;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
new file mode 100644
index 0000000..37afa53
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.input.stream.SocketInputStreamProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.util.AsterixRuntimeUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class SocketInputStreamProviderFactory implements IInputStreamProviderFactory {
+
+ private static final long serialVersionUID = 1L;
+ private List<Pair<String, Integer>> sockets;
+ private Mode mode = Mode.IP;
+
+ public static enum Mode {
+ NC,
+ IP
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ sockets = new ArrayList<Pair<String, Integer>>();
+ String modeValue = configuration.get(ExternalDataConstants.KEY_MODE);
+ if (modeValue != null) {
+ mode = Mode.valueOf(modeValue.trim().toUpperCase());
+ }
+ String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
+ if (socketsValue == null) {
+ throw new IllegalArgumentException("\'sockets\' parameter not specified as part of adapter configuration");
+ }
+ Map<InetAddress, Set<String>> ncMap = AsterixRuntimeUtil.getNodeControllerMap();
+ List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
+ String[] socketsArray = socketsValue.split(",");
+ Random random = new Random();
+ for (String socket : socketsArray) {
+ String[] socketTokens = socket.split(":");
+ String host = socketTokens[0].trim();
+ int port = Integer.parseInt(socketTokens[1].trim());
+ Pair<String, Integer> p = null;
+ switch (mode) {
+ case IP:
+ Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
+ if (ncsOnIp == null || ncsOnIp.isEmpty()) {
+ throw new IllegalArgumentException("Invalid host " + host
+ + " as it is not part of the AsterixDB cluster. Valid choices are "
+ + StringUtils.join(ncMap.keySet(), ", "));
+ }
+ String[] ncArray = ncsOnIp.toArray(new String[] {});
+ String nc = ncArray[random.nextInt(ncArray.length)];
+ p = new Pair<String, Integer>(nc, port);
+ break;
+
+ case NC:
+ p = new Pair<String, Integer>(host, port);
+ if (!ncs.contains(host)) {
+ throw new IllegalArgumentException(
+ "Invalid NC " + host + " as it is not part of the AsterixDB cluster. Valid choices are "
+ + StringUtils.join(ncs, ", "));
+
+ }
+ break;
+ }
+ sockets.add(p);
+ }
+ }
+
+ @Override
+ public synchronized IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+ throws IOException, AsterixException {
+ Pair<String, Integer> socket = sockets.get(partition);
+ ServerSocket server = new ServerSocket(socket.second);
+ return new SocketInputStreamProvider(server);
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() {
+ List<String> locations = new ArrayList<String>();
+ for (Pair<String, Integer> socket : sockets) {
+ locations.add(socket.first);
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
+ }
+
+ public List<Pair<String, Integer>> getSockets() {
+ return sockets;
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.STREAM;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
new file mode 100644
index 0000000..b86c294
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
+ * simulates a twitter firehose with tweets being "pushed" into Asterix at a
+ * configurable rate measured in terms of TPS (tweets/second). The stream of
+ * tweets lasts for a configurable duration (measured in seconds).
+ */
+public class TwitterFirehoseStreamProviderFactory implements IInputStreamProviderFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Degree of parallelism for feed ingestion activity. Defaults to 1. This
+ * determines the count constraint for the ingestion operator.
+ **/
+ private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
+
+ /**
+ * The absolute locations where ingestion operator instances will be placed.
+ **/
+ private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
+
+ private Map<String, String> configuration;
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
+ String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
+ String[] locations = null;
+ if (ingestionLocationParam != null) {
+ locations = ingestionLocationParam.split(",");
+ }
+ int count = locations != null ? locations.length : 1;
+ if (ingestionCardinalityParam != null) {
+ count = Integer.parseInt(ingestionCardinalityParam);
+ }
+
+ List<String> chosenLocations = new ArrayList<String>();
+ String[] availableLocations = locations != null ? locations
+ : AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(new String[] {});
+ for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
+ chosenLocations.add(availableLocations[k]);
+ }
+ return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.STREAM;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception {
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
index 14f831b..e9c15cb 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
@@ -21,6 +21,9 @@ package org.apache.asterix.external.library;
import java.io.IOException;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
+import org.apache.asterix.external.api.IFunctionHelper;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
import org.apache.asterix.om.types.ATypeTag;