You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/03/16 00:36:25 UTC
[09/19] incubator-asterixdb git commit: Support Change Feeds and
Ingestion of Records with MetaData
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index d61dc5c..d55ac87 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -19,34 +19,50 @@
package org.apache.asterix.external.input.record.reader.stream;
import java.io.IOException;
-import java.util.Map;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class LineRecordReader extends AbstractStreamRecordReader {
protected boolean prevCharCR;
protected int newlineLength;
protected int recordNumber = 0;
- private boolean configured = false;
+
+ public LineRecordReader(final boolean hasHeader, final AInputStream stream, final IExternalIndexer indexer)
+ throws HyracksDataException {
+ super(stream, indexer);
+ try {
+ if (hasHeader) {
+ if (hasNext()) {
+ next();
+ }
+ }
+ } catch (final IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ }
@Override
public boolean hasNext() throws IOException {
if (done) {
return false;
}
- /* We're reading data from in, but the head of the stream may be
+ /*
+ * We're reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
+ * everything and read another buffer from the stream.
* 2. An unambiguously terminated line is in buffer, so we just
* copy to record.
* 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to record, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
+ * in CR. In this case we copy everything up to CR to record, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
* We use a flag prevCharCR to signal if previous character was CR
* and, if it happens to be at the end of the buffer, delay
* consuming it until we have a chance to look at the char that
@@ -95,17 +111,4 @@ public class LineRecordReader extends AbstractStreamRecordReader {
recordNumber++;
return true;
}
-
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
- if (!configured) {
- super.configure(configuration);
- if (ExternalDataUtils.hasHeader(configuration)) {
- if (hasNext()) {
- next();
- }
- }
- }
- configured = true;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
index f0867d3..68f10f6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
@@ -18,26 +18,30 @@
*/
package org.apache.asterix.external.input.record.reader.stream;
-import java.util.Map;
-
+import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
private static final long serialVersionUID = 1L;
@Override
- public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+ public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
- LineRecordReader recordReader;
+ boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
+ Pair<AInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
if (quoteString != null) {
- recordReader = new QuotedLineRecordReader();
+ return new QuotedLineRecordReader(hasHeader, streamAndIndexer.first, streamAndIndexer.second, quoteString);
} else {
- recordReader = new LineRecordReader();
+ return new LineRecordReader(hasHeader, streamAndIndexer.first, streamAndIndexer.second);
}
- return configureReader(recordReader, ctx, partition);
}
@Override
@@ -45,8 +49,4 @@ public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<c
return char[].class;
}
- @Override
- protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index a8eb07b..6266aa2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -19,24 +19,24 @@
package org.apache.asterix.external.input.record.reader.stream;
import java.io.IOException;
-import java.util.Map;
-import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class QuotedLineRecordReader extends LineRecordReader {
- private char quote;
+ private final char quote;
private boolean prevCharEscape;
private boolean inQuote;
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
- super.configure(configuration);
- String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
- if (quoteString == null || quoteString.length() != 1) {
- throw new AsterixException(ExternalDataExceptionUtils.incorrectParameterMessage(
+ public QuotedLineRecordReader(final boolean hasHeader, final AInputStream stream, final IExternalIndexer indexer,
+ final String quoteString) throws HyracksDataException {
+ super(hasHeader, stream, indexer);
+ if ((quoteString == null) || (quoteString.length() != 1)) {
+ throw new HyracksDataException(ExternalDataExceptionUtils.incorrectParameterMessage(
ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
}
this.quote = quoteString.charAt(0);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index f41486e..678dd03 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -19,9 +19,10 @@
package org.apache.asterix.external.input.record.reader.stream;
import java.io.IOException;
-import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataExceptionUtils;
@@ -34,15 +35,10 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
private char recordEnd;
private int recordNumber = 0;
- public int getRecordNumber() {
- return recordNumber;
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
- super.configure(configuration);
- String recStartString = configuration.get(ExternalDataConstants.KEY_RECORD_START);
- String recEndString = configuration.get(ExternalDataConstants.KEY_RECORD_END);
+ public SemiStructuredRecordReader(AInputStream stream, IExternalIndexer indexer, String recStartString,
+ String recEndString) throws AsterixException {
+ super(stream, indexer);
+ // set record opening char
if (recStartString != null) {
if (recStartString.length() != 1) {
throw new AsterixException(
@@ -53,6 +49,7 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
} else {
recordStart = ExternalDataConstants.DEFAULT_RECORD_START;
}
+ // set record ending char
if (recEndString != null) {
if (recEndString.length() != 1) {
throw new AsterixException(
@@ -65,6 +62,10 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
}
}
+ public int getRecordNumber() {
+ return recordNumber;
+ }
+
@Override
public boolean hasNext() throws Exception {
if (done) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
index ec8eac9..206ae50 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
@@ -18,27 +18,34 @@
*/
package org.apache.asterix.external.input.record.reader.stream;
-import java.util.Map;
-
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
private static final long serialVersionUID = 1L;
@Override
- public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
- SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
- return configureReader(recordReader, ctx, partition);
+ public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ Pair<AInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
+ try {
+ return new SemiStructuredRecordReader(streamAndIndexer.first, streamAndIndexer.second,
+ configuration.get(ExternalDataConstants.KEY_RECORD_START),
+ configuration.get(ExternalDataConstants.KEY_RECORD_END));
+ } catch (AsterixException e) {
+ throw new HyracksDataException(e);
+ }
}
@Override
public Class<? extends char[]> getRecordClass() {
return char[].class;
}
-
- @Override
- protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index 617bc39..be9ce06 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -20,15 +20,12 @@ package org.apache.asterix.external.input.record.reader.twitter;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import twitter4j.Query;
@@ -39,7 +36,6 @@ import twitter4j.TwitterException;
public class TwitterPullRecordReader implements IRecordReader<Status> {
- private String keywords;
private Query query;
private Twitter twitter;
private int requestInterval = 5; // seconds
@@ -48,7 +44,12 @@ public class TwitterPullRecordReader implements IRecordReader<Status> {
private long lastTweetIdReceived = 0;
private GenericRecord<Status> record;
- public TwitterPullRecordReader() {
+ public TwitterPullRecordReader(Twitter twitter, String keywords, int requestInterval) {
+ this.twitter = twitter;
+ this.requestInterval = requestInterval;
+ this.query = new Query(keywords);
+ this.query.setCount(100);
+ this.record = new GenericRecord<Status>();
}
@Override
@@ -56,16 +57,6 @@ public class TwitterPullRecordReader implements IRecordReader<Status> {
}
@Override
- public void configure(Map<String, String> configuration) throws Exception {
- twitter = TwitterUtil.getTwitterService(configuration);
- keywords = configuration.get(SearchAPIConstants.QUERY);
- requestInterval = Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL));
- query = new Query(keywords);
- query.setCount(100);
- record = new GenericRecord<Status>();
- }
-
- @Override
public boolean hasNext() throws Exception {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 19f156c..64695b5 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -19,7 +19,6 @@
package org.apache.asterix.external.input.record.reader.twitter;
import java.io.IOException;
-import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.asterix.external.api.IDataFlowController;
@@ -27,7 +26,6 @@ import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.TwitterUtil;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
@@ -42,6 +40,22 @@ public class TwitterPushRecordReader implements IRecordReader<Status> {
private GenericRecord<Status> record;
private boolean closed = false;
+ public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) {
+ record = new GenericRecord<Status>();
+ inputQ = new LinkedBlockingQueue<Status>();
+ this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration);
+ this.twitterStream.addListener(new TweetListener(inputQ));
+ this.twitterStream.filter(query);
+ }
+
+ public TwitterPushRecordReader(TwitterStream twitterStream) {
+ record = new GenericRecord<Status>();
+ inputQ = new LinkedBlockingQueue<Status>();
+ this.twitterStream = twitterStream;//
+ this.twitterStream.addListener(new TweetListener(inputQ));
+ twitterStream.sample();
+ }
+
@Override
public void close() throws IOException {
if (!closed) {
@@ -53,20 +67,6 @@ public class TwitterPushRecordReader implements IRecordReader<Status> {
}
@Override
- public void configure(Map<String, String> configuration) throws Exception {
- record = new GenericRecord<Status>();
- inputQ = new LinkedBlockingQueue<Status>();
- twitterStream = TwitterUtil.getTwitterStream(configuration);
- twitterStream.addListener(new TweetListener(inputQ));
- FilterQuery query = TwitterUtil.getFilterQuery(configuration);
- if (query != null) {
- twitterStream.filter(query);
- } else {
- twitterStream.sample();
- }
- }
-
- @Override
public boolean hasNext() throws Exception {
return !closed;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index a2a4742..7ca185f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -33,7 +33,9 @@ import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import twitter4j.FilterQuery;
import twitter4j.Status;
public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
@@ -54,13 +56,13 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, INTAKE_CARDINALITY);
return clusterLocations;
}
@Override
- public void configure(Map<String, String> configuration) throws Exception {
+ public void configure(Map<String, String> configuration) throws AsterixException {
this.configuration = configuration;
TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
if (!validateConfiguration(configuration)) {
@@ -70,7 +72,7 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
- throw new Exception(builder.toString());
+ throw new AsterixException(builder.toString());
}
if (ExternalDataUtils.isPull(configuration)) {
pull = true;
@@ -107,15 +109,22 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
}
@Override
- public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
- IRecordReader<Status> reader;
+ public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
if (pull) {
- reader = new TwitterPullRecordReader();
+ return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
+ configuration.get(SearchAPIConstants.QUERY),
+ Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
} else {
- reader = new TwitterPushRecordReader();
+ FilterQuery query;
+ try {
+ query = TwitterUtil.getFilterQuery(configuration);
+ return (query == null) ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration))
+ : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), query);
+ } catch (AsterixException e) {
+ throw new HyracksDataException(e);
+ }
}
- reader.configure(configuration);
- return reader;
}
@Override
@@ -128,7 +137,7 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
String consumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
String accessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
String tokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
- if (consumerKey == null || consumerSecret == null || accessToken == null || tokenSecret == null) {
+ if ((consumerKey == null) || (consumerSecret == null) || (accessToken == null) || (tokenSecret == null)) {
return false;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
index 469e866..b78f96d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
@@ -18,9 +18,7 @@
*/
package org.apache.asterix.external.input.stream;
-import java.io.IOException;
import java.io.InputStream;
-import java.util.Map;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
@@ -30,8 +28,6 @@ public abstract class AInputStream extends InputStream {
public abstract boolean stop() throws Exception;
- public abstract void configure(Map<String, String> configuration) throws IOException;
-
// TODO: Find a better way to send notifications
public abstract void setController(AbstractFeedDataFlowController controller);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index 89008aa..bf85330 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -19,7 +19,7 @@
package org.apache.asterix.external.input.stream;
import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.Reader;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharsetDecoder;
@@ -29,7 +29,7 @@ import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
-public class AInputStreamReader extends InputStreamReader {
+public class AInputStreamReader extends Reader {
private AInputStream in;
private byte[] bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
private ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
@@ -38,7 +38,6 @@ public class AInputStreamReader extends InputStreamReader {
private boolean done = false;
public AInputStreamReader(AInputStream in) {
- super(in);
this.in = in;
this.decoder = StandardCharsets.UTF_8.newDecoder();
this.byteBuffer.flip();
@@ -74,22 +73,42 @@ public class AInputStreamReader extends InputStreamReader {
if (done) {
return -1;
}
+ int len = 0;
charBuffer.clear();
- if (byteBuffer.hasRemaining()) {
+ while (charBuffer.position() == 0) {
+ if (byteBuffer.hasRemaining()) {
+ decoder.decode(byteBuffer, charBuffer, false);
+ System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+ if (charBuffer.position() > 0) {
+ return charBuffer.position();
+ } else {
+ // need to read more data
+ System.arraycopy(bytes, byteBuffer.position(), bytes, 0, byteBuffer.remaining());
+ byteBuffer.position(byteBuffer.remaining());
+ while (len == 0) {
+ len = in.read(bytes, byteBuffer.position(), bytes.length - byteBuffer.position());
+ }
+ }
+ } else {
+ byteBuffer.clear();
+ while (len == 0) {
+ len = in.read(bytes, 0, bytes.length);
+ }
+ }
+ if (len == -1) {
+ done = true;
+ return len;
+ }
+ byteBuffer.position(len);
+ byteBuffer.flip();
decoder.decode(byteBuffer, charBuffer, false);
System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
- return charBuffer.position();
}
- int len = in.read(bytes, 0, bytes.length);
- if (len == -1) {
- done = true;
- return len;
- }
- byteBuffer.clear();
- byteBuffer.position(len);
- byteBuffer.flip();
- decoder.decode(byteBuffer, charBuffer, false);
- System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
return charBuffer.position();
}
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
index 5b654eb..176f5f4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
@@ -20,7 +20,6 @@ package org.apache.asterix.external.input.stream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Map;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
@@ -89,10 +88,6 @@ public class BasicInputStream extends AInputStream {
}
@Override
- public void configure(Map<String, String> configuration) {
- }
-
- @Override
public void setFeedLogManager(FeedLogManager logManager) {
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
index 8dcd5b6..dc6a130 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
@@ -21,7 +21,6 @@ package org.apache.asterix.external.input.stream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Path;
-import java.util.Map;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -34,8 +33,10 @@ public class LocalFileSystemInputStream extends AInputStream {
private FileInputStream in;
private byte lastByte;
- public LocalFileSystemInputStream(Path inputResource, String expression, boolean isFeed) throws IOException {
+ public LocalFileSystemInputStream(Path inputResource, String expression, boolean isFeed)
+ throws HyracksDataException {
this.watcher = new FileSystemWatcher(inputResource, expression, isFeed);
+ watcher.init();
}
@Override
@@ -105,10 +106,10 @@ public class LocalFileSystemInputStream extends AInputStream {
}
}
int result = in.read(b, off, len);
- while (result < 0 && advance()) {
+ while ((result < 0) && advance()) {
// return a new line at the end of every file <--Might create problems for some cases
// depending on the parser implementation-->
- if (lastByte != ExternalDataConstants.BYTE_LF && lastByte != ExternalDataConstants.BYTE_LF) {
+ if ((lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_LF)) {
lastByte = ExternalDataConstants.BYTE_LF;
b[off] = ExternalDataConstants.BYTE_LF;
return 1;
@@ -117,7 +118,7 @@ public class LocalFileSystemInputStream extends AInputStream {
result = in.read(b, off, len);
}
if (result > 0) {
- lastByte = b[off + result - 1];
+ lastByte = b[(off + result) - 1];
}
return result;
}
@@ -133,9 +134,4 @@ public class LocalFileSystemInputStream extends AInputStream {
watcher.close();
return true;
}
-
- @Override
- public void configure(Map<String, String> configuration) throws IOException {
- watcher.init();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
deleted file mode 100644
index 67c4493..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Map;
-
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class SocketInputStream extends AInputStream {
- private ServerSocket server;
- private Socket socket;
- private InputStream connectionStream;
- private AbstractFeedDataFlowController controller;
-
- public SocketInputStream(ServerSocket server) throws IOException {
- this.server = server;
- socket = new Socket();
- connectionStream = new InputStream() {
- @Override
- public int read() throws IOException {
- return -1;
- }
- };
- }
-
- @Override
- public int read() throws IOException {
- int read = connectionStream.read();
- while (read < 0) {
- accept();
- read = connectionStream.read();
- }
- return read;
- }
-
- @Override
- public boolean skipError() throws Exception {
- accept();
- return true;
- }
-
- @Override
- public int read(byte b[]) throws IOException {
- return read(b, 0, b.length);
- }
-
- @Override
- public int read(byte b[], int off, int len) throws IOException {
- if (server == null) {
- return -1;
- }
- int read = -1;
- try {
- if (connectionStream.available() < 1) {
- controller.flush();
- }
- read = connectionStream.read(b, off, len);
- } catch (IOException e) {
- e.printStackTrace();
- read = -1;
- }
- while (read < 0) {
- if (!accept()) {
- return -1;
- }
- try {
- read = connectionStream.read(b, off, len);
- } catch (IOException e) {
- e.printStackTrace();
- read = -1;
- }
- }
- return read;
- }
-
- @Override
- public long skip(long n) throws IOException {
- return 0;
- }
-
- @Override
- public int available() throws IOException {
- return 1;
- }
-
- @Override
- public synchronized void close() throws IOException {
- HyracksDataException hde = null;
- try {
- if (connectionStream != null) {
- connectionStream.close();
- }
- connectionStream = null;
- } catch (IOException e) {
- hde = new HyracksDataException(e);
- }
- try {
- if (socket != null) {
- socket.close();
- }
- socket = null;
- } catch (IOException e) {
- hde = ExternalDataExceptionUtils.suppress(hde, e);
- }
- try {
- if (server != null) {
- server.close();
- }
- } catch (IOException e) {
- hde = ExternalDataExceptionUtils.suppress(hde, e);
- } finally {
- server = null;
- }
- if (hde != null) {
- throw hde;
- }
- }
-
- private boolean accept() throws IOException {
- try {
- connectionStream.close();
- connectionStream = null;
- socket.close();
- socket = null;
- socket = server.accept();
- connectionStream = socket.getInputStream();
- return true;
- } catch (Exception e) {
- close();
- return false;
- }
- }
-
- @Override
- public boolean stop() throws Exception {
- close();
- return true;
- }
-
- @Override
- public void configure(Map<String, String> configuration) {
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager logManager) {
- }
-
- @Override
- public void setController(AbstractFeedDataFlowController controller) {
- this.controller = controller;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
new file mode 100644
index 0000000..1c33709
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SocketServerInputStream extends AInputStream {
+ private ServerSocket server;
+ private Socket socket;
+ private InputStream connectionStream;
+ private AbstractFeedDataFlowController controller;
+
+ public SocketServerInputStream(ServerSocket server) {
+ this.server = server;
+ socket = new Socket();
+ connectionStream = new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return -1;
+ }
+ };
+ }
+
+ @Override
+ public int read() throws IOException {
+ int read = connectionStream.read();
+ while (read < 0) {
+ accept();
+ read = connectionStream.read();
+ }
+ return read;
+ }
+
+ @Override
+ public boolean skipError() throws Exception {
+ accept();
+ return true;
+ }
+
+ @Override
+ public int read(byte b[]) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ if (server == null) {
+ return -1;
+ }
+ int read = -1;
+ try {
+ if (connectionStream.available() < 1) {
+ controller.flush();
+ }
+ read = connectionStream.read(b, off, len);
+ } catch (IOException e) {
+ e.printStackTrace();
+ read = -1;
+ }
+ while (read < 0) {
+ if (!accept()) {
+ return -1;
+ }
+ try {
+ read = connectionStream.read(b, off, len);
+ } catch (IOException e) {
+ e.printStackTrace();
+ read = -1;
+ }
+ }
+ return read;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return 1;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ HyracksDataException hde = null;
+ try {
+ if (connectionStream != null) {
+ connectionStream.close();
+ }
+ connectionStream = null;
+ } catch (IOException e) {
+ hde = new HyracksDataException(e);
+ }
+ try {
+ if (socket != null) {
+ socket.close();
+ }
+ socket = null;
+ } catch (IOException e) {
+ hde = ExternalDataExceptionUtils.suppress(hde, e);
+ }
+ try {
+ if (server != null) {
+ server.close();
+ }
+ } catch (IOException e) {
+ hde = ExternalDataExceptionUtils.suppress(hde, e);
+ } finally {
+ server = null;
+ }
+ if (hde != null) {
+ throw hde;
+ }
+ }
+
+ private boolean accept() throws IOException {
+ try {
+ connectionStream.close();
+ connectionStream = null;
+ socket.close();
+ socket = null;
+ socket = server.accept();
+ connectionStream = socket.getInputStream();
+ return true;
+ } catch (Exception e) {
+ close();
+ return false;
+ }
+ }
+
+ @Override
+ public boolean stop() throws Exception {
+ close();
+ return true;
+ }
+
+ @Override
+ public void setFeedLogManager(FeedLogManager logManager) {
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+ this.controller = controller;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index 5c1583e..54ee780 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -47,15 +47,14 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
protected static INodeResolver nodeResolver;
protected Map<String, String> configuration;
protected FileSplit[] inputFileSplits;
- protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log
- // storage
+ protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
protected boolean isFeed;
protected String expression;
// transient fields (They don't need to be serialized and transferred)
private transient AlgebricksAbsolutePartitionConstraint constraints;
@Override
- public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception {
+ public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) {
return new LocalFSInputStreamProvider(inputFileSplits, ctx, configuration, partition, expression, isFeed);
}
@@ -70,7 +69,7 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
}
@Override
- public void configure(Map<String, String> configuration) throws Exception {
+ public void configure(Map<String, String> configuration) throws AsterixException {
this.configuration = configuration;
String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
configureFileSplits(splits);
@@ -84,7 +83,7 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
return constraints;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java
new file mode 100644
index 0000000..5e84123
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.input.stream.provider.SocketClientInputStreamProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.http.impl.conn.SystemDefaultDnsResolver;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SocketClientInputStreamProviderFactory implements IInputStreamProviderFactory {
+
+ private static final long serialVersionUID = 1L;
+ private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+ private List<Pair<String, Integer>> sockets;
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, sockets.size());
+ return clusterLocations;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws AsterixException {
+ try {
+ this.sockets = new ArrayList<Pair<String, Integer>>();
+ String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
+ if (socketsValue == null) {
+ throw new IllegalArgumentException(
+ "\'sockets\' parameter not specified as part of adapter configuration");
+ }
+ String[] socketsArray = socketsValue.split(",");
+ for (String socket : socketsArray) {
+ String[] socketTokens = socket.split(":");
+ String host = socketTokens[0].trim();
+ int port = Integer.parseInt(socketTokens[1].trim());
+ InetAddress[] resolved;
+ resolved = SystemDefaultDnsResolver.INSTANCE.resolve(host);
+ Pair<String, Integer> p = new Pair<String, Integer>(resolved[0].getHostAddress(), port);
+ sockets.add(p);
+ }
+ } catch (UnknownHostException e) {
+ throw new AsterixException(e);
+ }
+ }
+
+ @Override
+ public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ return new SocketClientInputStreamProvider(sockets.get(partition));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
deleted file mode 100644
index 6fdc42d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.factory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.provider.SocketInputStreamProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.om.util.AsterixRuntimeUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class SocketInputStreamProviderFactory implements IInputStreamProviderFactory {
-
- private static final long serialVersionUID = 1L;
- private List<Pair<String, Integer>> sockets;
- private Mode mode = Mode.IP;
-
- public static enum Mode {
- NC,
- IP
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
- sockets = new ArrayList<Pair<String, Integer>>();
- String modeValue = configuration.get(ExternalDataConstants.KEY_MODE);
- if (modeValue != null) {
- mode = Mode.valueOf(modeValue.trim().toUpperCase());
- }
- String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
- if (socketsValue == null) {
- throw new IllegalArgumentException("\'sockets\' parameter not specified as part of adapter configuration");
- }
- Map<InetAddress, Set<String>> ncMap = AsterixRuntimeUtil.getNodeControllerMap();
- List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
- String[] socketsArray = socketsValue.split(",");
- Random random = new Random();
- for (String socket : socketsArray) {
- String[] socketTokens = socket.split(":");
- String host = socketTokens[0].trim();
- int port = Integer.parseInt(socketTokens[1].trim());
- Pair<String, Integer> p = null;
- switch (mode) {
- case IP:
- Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
- if (ncsOnIp == null || ncsOnIp.isEmpty()) {
- throw new IllegalArgumentException("Invalid host " + host
- + " as it is not part of the AsterixDB cluster. Valid choices are "
- + StringUtils.join(ncMap.keySet(), ", "));
- }
- String[] ncArray = ncsOnIp.toArray(new String[] {});
- String nc = ncArray[random.nextInt(ncArray.length)];
- p = new Pair<String, Integer>(nc, port);
- break;
-
- case NC:
- p = new Pair<String, Integer>(host, port);
- if (!ncs.contains(host)) {
- throw new IllegalArgumentException(
- "Invalid NC " + host + " as it is not part of the AsterixDB cluster. Valid choices are "
- + StringUtils.join(ncs, ", "));
-
- }
- break;
- }
- sockets.add(p);
- }
- }
-
- @Override
- public synchronized IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
- throws IOException, AsterixException {
- Pair<String, Integer> socket = sockets.get(partition);
- ServerSocket server = new ServerSocket(socket.second);
- return new SocketInputStreamProvider(server);
- }
-
- @Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- List<String> locations = new ArrayList<String>();
- for (Pair<String, Integer> socket : sockets) {
- locations.add(socket.first);
- }
- return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
- }
-
- public List<Pair<String, Integer>> getSockets() {
- return sockets;
- }
-
- @Override
- public DataSourceType getDataSourceType() {
- return DataSourceType.STREAM;
- }
-
- @Override
- public boolean isIndexible() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
new file mode 100644
index 0000000..a301c1a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.input.stream.provider.SocketServerInputStreamProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.util.AsterixRuntimeUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SocketServerInputStreamProviderFactory implements IInputStreamProviderFactory {
+
+ private static final long serialVersionUID = 1L;
+ private List<Pair<String, Integer>> sockets;
+ private Mode mode = Mode.IP;
+
+ public static enum Mode {
+ NC,
+ IP
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws AsterixException {
+ try {
+ sockets = new ArrayList<Pair<String, Integer>>();
+ String modeValue = configuration.get(ExternalDataConstants.KEY_MODE);
+ if (modeValue != null) {
+ mode = Mode.valueOf(modeValue.trim().toUpperCase());
+ }
+ String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
+ if (socketsValue == null) {
+ throw new IllegalArgumentException(
+ "\'sockets\' parameter not specified as part of adapter configuration");
+ }
+ Map<InetAddress, Set<String>> ncMap;
+ ncMap = AsterixRuntimeUtil.getNodeControllerMap();
+ List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
+ String[] socketsArray = socketsValue.split(",");
+ Random random = new Random();
+ for (String socket : socketsArray) {
+ String[] socketTokens = socket.split(":");
+ String host = socketTokens[0].trim();
+ int port = Integer.parseInt(socketTokens[1].trim());
+ Pair<String, Integer> p = null;
+ switch (mode) {
+ case IP:
+ Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
+ if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
+ throw new IllegalArgumentException("Invalid host " + host
+ + " as it is not part of the AsterixDB cluster. Valid choices are "
+ + StringUtils.join(ncMap.keySet(), ", "));
+ }
+ String[] ncArray = ncsOnIp.toArray(new String[] {});
+ String nc = ncArray[random.nextInt(ncArray.length)];
+ p = new Pair<String, Integer>(nc, port);
+ break;
+
+ case NC:
+ p = new Pair<String, Integer>(host, port);
+ if (!ncs.contains(host)) {
+ throw new IllegalArgumentException("Invalid NC " + host
+ + " as it is not part of the AsterixDB cluster. Valid choices are "
+ + StringUtils.join(ncs, ", "));
+
+ }
+ break;
+ }
+ sockets.add(p);
+ }
+ } catch (Exception e) {
+ throw new AsterixException(e);
+ }
+ }
+
+ @Override
+ public synchronized IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ try {
+ Pair<String, Integer> socket = sockets.get(partition);
+ ServerSocket server;
+ server = new ServerSocket(socket.second);
+ return new SocketServerInputStreamProvider(server);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ List<String> locations = new ArrayList<String>();
+ for (Pair<String, Integer> socket : sockets) {
+ locations.add(socket.first);
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
+ }
+
+ public List<Pair<String, Integer>> getSockets() {
+ return sockets;
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.STREAM;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
index 95378cb..7b09ade 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
@@ -28,6 +28,7 @@ import org.apache.asterix.external.input.stream.provider.TwitterFirehoseInputStr
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
@@ -53,7 +54,7 @@ public class TwitterFirehoseStreamProviderFactory implements IInputStreamProvide
private Map<String, String> configuration;
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
String[] locations = null;
@@ -80,7 +81,7 @@ public class TwitterFirehoseStreamProviderFactory implements IInputStreamProvide
}
@Override
- public void configure(Map<String, String> configuration) throws Exception {
+ public void configure(Map<String, String> configuration) {
this.configuration = configuration;
}
@@ -90,7 +91,8 @@ public class TwitterFirehoseStreamProviderFactory implements IInputStreamProvide
}
@Override
- public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception {
+ public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
return new TwitterFirehoseInputStreamProvider(configuration, ctx, partition);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
index bf9653d..e1ab331 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
@@ -38,12 +38,10 @@ public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implem
public HDFSInputStreamProvider(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot) throws Exception {
- super(read, inputSplits, readSchedule, nodeName, conf);
+ super(read, inputSplits, readSchedule, nodeName, conf, snapshot,
+ snapshot == null ? null : ExternalIndexerProvider.getIndexer(configuration));
value = new Text();
- configure(configuration);
if (snapshot != null) {
- setSnapshot(snapshot);
- setIndexer(ExternalIndexerProvider.getIndexer(configuration));
if (currentSplitIndex < snapshot.size()) {
indexer.reset(this);
}
@@ -51,7 +49,7 @@ public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implem
}
@Override
- public AInputStream getInputStream() throws Exception {
+ public AInputStream getInputStream() {
return new HDFSInputStream();
}
@@ -119,10 +117,6 @@ public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implem
}
@Override
- public void configure(Map<String, String> configuration) {
- }
-
- @Override
public void setFeedLogManager(FeedLogManager logManager) {
}
@@ -130,8 +124,4 @@ public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implem
public void setController(AbstractFeedDataFlowController controller) {
}
}
-
- @Override
- public void configure(Map<String, String> configuration) {
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
index 77520d4..fbe6035 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.input.stream.provider;
-import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
@@ -27,38 +26,33 @@ import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.input.stream.LocalFileSystemInputStream;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.file.FileSplit;
public class LocalFSInputStreamProvider implements IInputStreamProvider {
- private String expression;
- private boolean isFeed;
- private Path path;
+ private final String expression;
+ private final boolean isFeed;
+ private final Path path;
private FeedLogManager feedLogManager;
- private Map<String, String> configuration;
- public LocalFSInputStreamProvider(FileSplit[] fileSplits, IHyracksTaskContext ctx,
- Map<String, String> configuration, int partition, String expression, boolean isFeed) {
+ public LocalFSInputStreamProvider(final FileSplit[] fileSplits, final IHyracksTaskContext ctx,
+ final Map<String, String> configuration, final int partition, final String expression,
+ final boolean isFeed) {
this.expression = expression;
this.isFeed = isFeed;
this.path = fileSplits[partition].getLocalFile().getFile().toPath();
}
@Override
- public AInputStream getInputStream() throws IOException {
- LocalFileSystemInputStream stream = new LocalFileSystemInputStream(path, expression, isFeed);
+ public AInputStream getInputStream() throws HyracksDataException {
+ final LocalFileSystemInputStream stream = new LocalFileSystemInputStream(path, expression, isFeed);
stream.setFeedLogManager(feedLogManager);
- stream.configure(configuration);
return stream;
}
@Override
- public void configure(Map<String, String> configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
+ public void setFeedLogManager(final FeedLogManager feedLogManager) {
this.feedLogManager = feedLogManager;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
new file mode 100644
index 0000000..f842638
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
+
+public class SocketClientInputStreamProvider implements IInputStreamProvider {
+
+ private static final Logger LOGGER = Logger.getLogger(SocketClientInputStreamProvider.class.getName());
+ private final Socket socket;
+
+ public SocketClientInputStreamProvider(Pair<String, Integer> ipAndPort) throws HyracksDataException {
+ try {
+ socket = new Socket(ipAndPort.first, ipAndPort.second);
+ } catch (IOException e) {
+ LOGGER.error(
+ "Problem in creating socket against host " + ipAndPort.first + " on the port " + ipAndPort.second,
+ e);
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public AInputStream getInputStream() throws HyracksDataException {
+ InputStream in;
+ try {
+ in = socket.getInputStream();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return new AInputStream() {
+ @Override
+ public int read() throws IOException {
+ throw new IOException("method not supported. use read(byte[] buffer, int offset, int length) instead");
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ return in.read(buffer, offset, length);
+ }
+
+ @Override
+ public boolean stop() throws Exception {
+ if (!socket.isClosed()) {
+ try {
+ in.close();
+ } finally {
+ socket.close();
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean skipError() throws Exception {
+ return false;
+ }
+
+ @Override
+ public void setFeedLogManager(FeedLogManager logManager) {
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+ }
+ };
+ }
+
+ @Override
+ public void setFeedLogManager(FeedLogManager feedLogManager) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
deleted file mode 100644
index b6da314..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.net.ServerSocket;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.input.stream.SocketInputStream;
-import org.apache.asterix.external.util.FeedLogManager;
-
-public class SocketInputStreamProvider implements IInputStreamProvider {
- private ServerSocket server;
-
- public SocketInputStreamProvider(ServerSocket server) {
- this.server = server;
- }
-
- @Override
- public AInputStream getInputStream() throws Exception {
- return new SocketInputStream(server);
- }
-
- @Override
- public void configure(Map<String, String> configuration) {
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
new file mode 100644
index 0000000..64f0342
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.net.ServerSocket;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.SocketServerInputStream;
+import org.apache.asterix.external.util.FeedLogManager;
+
+public class SocketServerInputStreamProvider implements IInputStreamProvider {
+ private final ServerSocket server;
+
+ public SocketServerInputStreamProvider(ServerSocket server) {
+ this.server = server;
+ }
+
+ @Override
+ public AInputStream getInputStream() {
+ return new SocketServerInputStream(server);
+ }
+
+ @Override
+ public void setFeedLogManager(FeedLogManager feedLogManager) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
index cd4a3c1..a979262 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -35,40 +35,45 @@ import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.TweetGenerator;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {
private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStreamProvider.class.getName());
- private ExecutorService executorService;
+ private final ExecutorService executorService;
- private PipedOutputStream outputStream;
+ private final PipedOutputStream outputStream;
- private PipedInputStream inputStream;
+ private final PipedInputStream inputStream;
- private TwitterServer twitterServer;
+ private final TwitterServer twitterServer;
public TwitterFirehoseInputStreamProvider(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
- throws Exception {
- executorService = Executors.newCachedThreadPool();
- outputStream = new PipedOutputStream();
- inputStream = new PipedInputStream(outputStream);
- twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
+ throws HyracksDataException {
+ try {
+ executorService = Executors.newCachedThreadPool();
+ outputStream = new PipedOutputStream();
+ inputStream = new PipedInputStream(outputStream);
+ twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
@Override
- public AInputStream getInputStream() throws Exception {
+ public AInputStream getInputStream() {
return twitterServer;
}
private static class TwitterServer extends AInputStream {
private final DataProvider dataProvider;
private final ExecutorService executorService;
- private InputStream in;
+ private final InputStream in;
private boolean started;
public TwitterServer(Map<String, String> configuration, int partition, OutputStream os,
- ExecutorService executorService, InputStream in) throws Exception {
+ ExecutorService executorService, InputStream in) {
dataProvider = new DataProvider(configuration, partition, os);
this.executorService = executorService;
this.in = in;
@@ -111,10 +116,6 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
}
@Override
- public void configure(Map<String, String> configuration) {
- }
-
- @Override
public void setFeedLogManager(FeedLogManager logManager) {
}
@@ -127,7 +128,7 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
public static final String KEY_MODE = "mode";
- private TweetGenerator tweetGenerator;
+ private final TweetGenerator tweetGenerator;
private boolean continuePush = true;
private int batchSize;
private final Mode mode;
@@ -138,7 +139,7 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
CONTROLLED
}
- public DataProvider(Map<String, String> configuration, int partition, OutputStream os) throws Exception {
+ public DataProvider(Map<String, String> configuration, int partition, OutputStream os) {
this.tweetGenerator = new TweetGenerator(configuration, partition);
this.tweetGenerator.registerSubscriber(os);
this.os = os;
@@ -163,7 +164,6 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
boolean moreData = true;
long startBatch;
long endBatch;
-
while (true) {
try {
while (moreData && continuePush) {
@@ -175,7 +175,7 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
startBatch = System.currentTimeMillis();
moreData = tweetGenerator.generateNextBatch(batchSize);
endBatch = System.currentTimeMillis();
- if (endBatch - startBatch < 1000) {
+ if ((endBatch - startBatch) < 1000) {
Thread.sleep(1000 - (endBatch - startBatch));
}
break;
@@ -194,11 +194,6 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
public void stop() {
continuePush = false;
}
-
- }
-
- @Override
- public void configure(Map<String, String> configuration) {
}
@Override