You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:05 UTC
[11/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FeedPolicyEnforcer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FeedPolicyEnforcer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FeedPolicyEnforcer.java
deleted file mode 100644
index ae5c050..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FeedPolicyEnforcer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feeds;
-
-import java.rmi.RemoteException;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-
-public class FeedPolicyEnforcer {
-
- private final FeedConnectionId connectionId;
- private final FeedPolicyAccessor policyAccessor;
-
- public FeedPolicyEnforcer(FeedConnectionId feedConnectionId, Map<String, String> feedPolicy) {
- this.connectionId = feedConnectionId;
- this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
- }
-
- public boolean continueIngestionPostSoftwareFailure(Exception e) throws RemoteException, ACIDException {
- return policyAccessor.continueOnSoftFailure();
- }
-
- public FeedPolicyAccessor getFeedPolicyAccessor() {
- return policyAccessor;
- }
-
- public FeedConnectionId getFeedId() {
- return connectionId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
index 3b59b98..93ba0a0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
@@ -37,6 +37,7 @@ public abstract class AbstractStreamRecordReader implements IRecordReader<char[]
protected int bufferLength = 0;
protected int bufferPosn = 0;
protected IExternalIndexer indexer;
+ protected boolean done = false;
@Override
public IRawRecord<char[]> next() throws IOException {
@@ -45,7 +46,10 @@ public abstract class AbstractStreamRecordReader implements IRecordReader<char[]
@Override
public void close() throws IOException {
- reader.close();
+ if (!done) {
+ reader.close();
+ }
+ done = true;
}
public void setInputStream(AInputStream inputStream) throws IOException {
@@ -72,4 +76,15 @@ public abstract class AbstractStreamRecordReader implements IRecordReader<char[]
public void setIndexer(IExternalIndexer indexer) {
this.indexer = indexer;
}
+
+ @Override
+ public boolean stop() {
+ try {
+ reader.stop();
+ return true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
index 9b11df6..2b33d7a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
@@ -32,6 +32,9 @@ public class LineRecordReader extends AbstractStreamRecordReader {
@Override
public boolean hasNext() throws IOException {
+ if (done) {
+ return false;
+ }
/* We're reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
@@ -63,7 +66,7 @@ public class LineRecordReader extends AbstractStreamRecordReader {
recordNumber++;
return true;
}
- reader.close();
+ close();
return false; //EOF
}
}
@@ -92,11 +95,6 @@ public class LineRecordReader extends AbstractStreamRecordReader {
}
@Override
- public boolean stop() {
- return false;
- }
-
- @Override
public void configure(Map<String, String> configuration) throws Exception {
super.configure(configuration);
if (ExternalDataUtils.hasHeader(configuration)) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
index 668876e..49e67e9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
@@ -44,6 +44,9 @@ public class QuotedLineRecordReader extends LineRecordReader {
@Override
public boolean hasNext() throws IOException {
+ if (done) {
+ return false;
+ }
newlineLength = 0;
prevCharCR = false;
prevCharEscape = false;
@@ -65,6 +68,7 @@ public class QuotedLineRecordReader extends LineRecordReader {
recordNumber++;
return true;
}
+ close();
return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
index 9864805..84c96d0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
@@ -67,6 +67,9 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
@Override
public boolean hasNext() throws Exception {
+ if (done) {
+ return false;
+ }
record.reset();
boolean hasStarted = false;
boolean hasFinished = false;
@@ -79,6 +82,7 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
startPosn = bufferPosn = 0;
bufferLength = reader.read(inputBuffer);
if (bufferLength <= 0) {
+ close();
return false; // EOF
}
}
@@ -142,6 +146,12 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
@Override
public boolean stop() {
- return false;
+ try {
+ reader.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
index e7c141d..3ce6a81 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
@@ -38,11 +38,16 @@ public class TwitterPushRecordReader implements IRecordReader<Status> {
private LinkedBlockingQueue<Status> inputQ;
private TwitterStream twitterStream;
private GenericRecord<Status> record;
+ private boolean closed = false;
@Override
public void close() throws IOException {
- twitterStream.clearListeners();
- twitterStream.cleanUp();
+ if (!closed) {
+ twitterStream.clearListeners();
+ twitterStream.cleanUp();
+ twitterStream = null;
+ closed = true;
+ }
}
@Override
@@ -61,7 +66,7 @@ public class TwitterPushRecordReader implements IRecordReader<Status> {
@Override
public boolean hasNext() throws Exception {
- return true;
+ return !closed;
}
@Override
@@ -81,7 +86,12 @@ public class TwitterPushRecordReader implements IRecordReader<Status> {
@Override
public boolean stop() {
- return false;
+ try {
+ close();
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
}
private class TweetListener implements StatusListener {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
index 72aaa37..6840c11 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
@@ -97,7 +97,7 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
pull = false;
} else {
throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
- + ExternalDataConstants.KEY_PUSH + "must be specified as part of adaptor configuration");
+ + ExternalDataConstants.KEY_PUSH + " must be specified as part of adaptor configuration");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index e573f74..7ba6032 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.input.stream;
+import java.io.IOException;
import java.io.InputStreamReader;
public class AInputStreamReader extends InputStreamReader {
@@ -31,4 +32,12 @@ public class AInputStreamReader extends InputStreamReader {
public boolean skipError() throws Exception {
return in.skipError();
}
+
+ public void stop() throws IOException {
+ try {
+ in.stop();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
index b3ad1c3..8f4c094 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
@@ -63,7 +63,7 @@ public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implem
}
} else if (value.getLength() == pos) {
pos++;
- return ExternalDataConstants.EOL;
+ return ExternalDataConstants.BYTE_LF;
}
return value.getBytes()[pos++];
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
index b511617..22d0a87 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
@@ -19,38 +19,44 @@
package org.apache.asterix.external.input.stream;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
+import java.nio.file.Path;
import java.util.Map;
import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FeedUtils;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.dataflow.std.file.FileSplit;
public class LocalFSInputStreamProvider implements IInputStreamProvider {
- private FileSplit[] fileSplits;
- private int partition;
+ private String expression;
+ private boolean isFeed;
+ private Path path;
+ private File feedLogFile;
public LocalFSInputStreamProvider(FileSplit[] fileSplits, IHyracksTaskContext ctx,
- Map<String, String> configuration, int partition) {
- this.partition = partition;
- this.fileSplits = fileSplits;
+ Map<String, String> configuration, int partition, String expression, boolean isFeed,
+ FileSplit[] feedLogFileSplits) {
+ this.expression = expression;
+ this.isFeed = isFeed;
+ this.path = fileSplits[partition].getLocalFile().getFile().toPath();
+ if (feedLogFileSplits != null) {
+ this.feedLogFile = FeedUtils
+ .getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(),
+ feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager())
+ .getFile();
+
+ }
}
@Override
- public AInputStream getInputStream() throws Exception {
- FileSplit split = fileSplits[partition];
- File inputFile = split.getLocalFile().getFile();
- InputStream in;
- try {
- in = new FileInputStream(inputFile);
- return new BasicInputStream(in);
- } catch (FileNotFoundException e) {
- throw new IOException(e);
+ public AInputStream getInputStream() throws IOException {
+ FeedLogManager feedLogManager = null;
+ if (isFeed && feedLogFile != null) {
+ feedLogManager = new FeedLogManager(feedLogFile);
}
+ return new LocalFileSystemInputStream(path, expression, feedLogManager, isFeed);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
new file mode 100644
index 0000000..7eebe4c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FileSystemWatcher;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class LocalFileSystemInputStream extends AInputStream {
+ private final FileSystemWatcher watcher;
+ private FileInputStream in;
+ private byte lastByte;
+
+ public LocalFileSystemInputStream(Path inputResource, String expression, FeedLogManager logManager, boolean isFeed)
+ throws IOException {
+ this.watcher = new FileSystemWatcher(logManager, inputResource, expression, isFeed);
+ this.watcher.init();
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException ioe = null;
+ if (in != null) {
+ try {
+ closeFile();
+ } catch (Exception e) {
+ ioe = new IOException(e);
+ }
+ }
+ try {
+ watcher.close();
+ } catch (Exception e) {
+ if (ioe == null) {
+ throw e;
+ }
+ ioe.addSuppressed(e);
+ throw ioe;
+ }
+ }
+
+ private void closeFile() throws IOException {
+ if (in != null) {
+ try {
+ in.close();
+ } finally {
+ in = null;
+ }
+ }
+ }
+
+ /**
+ * Closes the current input stream and opens the next one, if any.
+ */
+ private boolean advance() throws IOException {
+ closeFile();
+ if (watcher.hasNext()) {
+ in = new FileInputStream(watcher.next());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new HyracksDataException(
+ "read() is not supported with this stream. use read(byte[] b, int off, int len)");
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (in == null) {
+ if (!advance()) {
+ return -1;
+ }
+ }
+ int result = in.read(b, off, len);
+ while (result < 0 && advance()) {
+ // return a new line at the end of every file <--Might create problems for some cases depending on the parser implementation-->
+ if (lastByte != ExternalDataConstants.BYTE_LF && lastByte != ExternalDataConstants.BYTE_LF) {
+ lastByte = ExternalDataConstants.BYTE_LF;
+ b[off] = ExternalDataConstants.BYTE_LF;
+ return 1;
+ }
+ // recursive call
+ result = in.read(b, off, len);
+ }
+ if (result > 0) {
+ lastByte = b[off + result - 1];
+ }
+ return result;
+ }
+
+ @Override
+ public boolean skipError() throws Exception {
+ advance();
+ return true;
+ }
+
+ @Override
+ public boolean stop() throws Exception {
+ watcher.close();
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
index d32a94f..7c64aa3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
@@ -30,7 +30,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.runtime.TweetGenerator;
+import org.apache.asterix.external.util.TweetGenerator;
import org.apache.hyracks.api.context.IHyracksTaskContext;
public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index 14c712a..ab1f8a0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -29,8 +29,10 @@ import org.apache.asterix.external.api.IInputStreamProviderFactory;
import org.apache.asterix.external.api.INodeResolver;
import org.apache.asterix.external.api.INodeResolverFactory;
import org.apache.asterix.external.input.stream.LocalFSInputStreamProvider;
-import org.apache.asterix.external.util.DNSResolverFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.NodeResolverFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -41,15 +43,21 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
private static final long serialVersionUID = 1L;
- protected static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver();
+ protected static final INodeResolver DEFAULT_NODE_RESOLVER = new NodeResolverFactory().createNodeResolver();
protected static final Logger LOGGER = Logger.getLogger(LocalFSInputStreamProviderFactory.class.getName());
protected static INodeResolver nodeResolver;
protected Map<String, String> configuration;
- protected FileSplit[] fileSplits;
+ protected FileSplit[] inputFileSplits;
+ protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
+ protected boolean isFeed;
+ protected String expression;
+ // transient fields (They don't need to be serialized and transferred)
+ private transient AlgebricksAbsolutePartitionConstraint constraints;
@Override
public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception {
- return new LocalFSInputStreamProvider(fileSplits, ctx, configuration, partition);
+ return new LocalFSInputStreamProvider(inputFileSplits, ctx, configuration, partition, expression, isFeed,
+ feedLogFileSplits);
}
@Override
@@ -67,16 +75,23 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
this.configuration = configuration;
String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
configureFileSplits(splits);
+ configurePartitionConstraint();
+ this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.keepDataSourceOpen(configuration);
+ if (isFeed) {
+ feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
+ ExternalDataUtils.getFeedName(configuration), constraints);
+ }
+ this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION);
}
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return configurePartitionConstraint();
+ return constraints;
}
private void configureFileSplits(String[] splits) throws AsterixException {
- if (fileSplits == null) {
- fileSplits = new FileSplit[splits.length];
+ if (inputFileSplits == null) {
+ inputFileSplits = new FileSplit[splits.length];
String nodeName;
String nodeLocalPath;
int count = 0;
@@ -90,19 +105,19 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
nodeName = trimmedValue.split(":")[0];
nodeLocalPath = trimmedValue.split("://")[1];
FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
- fileSplits[count++] = fileSplit;
+ inputFileSplits[count++] = fileSplit;
}
}
}
- private AlgebricksPartitionConstraint configurePartitionConstraint() throws AsterixException {
- String[] locs = new String[fileSplits.length];
+ private void configurePartitionConstraint() throws AsterixException {
+ String[] locs = new String[inputFileSplits.length];
String location;
- for (int i = 0; i < fileSplits.length; i++) {
- location = getNodeResolver().resolveNode(fileSplits[i].getNodeName());
+ for (int i = 0; i < inputFileSplits.length; i++) {
+ location = getNodeResolver().resolveNode(inputFileSplits[i].getNodeName());
locs[i] = location;
}
- return new AlgebricksAbsolutePartitionConstraint(locs);
+ constraints = new AlgebricksAbsolutePartitionConstraint(locs);
}
protected INodeResolver getNodeResolver() {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
index e9c15cb..fd3d9e3 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
@@ -75,7 +75,7 @@ public abstract class ExternalFunction implements IExternalFunction {
}
}
- public static ISerializerDeserializer getSerDe(Object typeInfo) {
+ public static ISerializerDeserializer<?> getSerDe(Object typeInfo) {
return AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(typeInfo);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java
index 192bd4e..b6795f6 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java
@@ -117,6 +117,7 @@ public class ResultCollector implements IResultCollector {
return reusableResultObjectHolder;
}
+ @SuppressWarnings("unchecked")
private void serializeResult(IAObject object) throws AsterixException {
try {
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(finfo.getReturnType())
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java
index 93b4bf1..e7c1ec1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
import org.apache.asterix.external.api.IJObject;
@@ -65,7 +64,6 @@ public class JObjectUtil {
/**
* Normalize an input string by removing linebreaks, and replace them with space
* Also remove non-readable special characters
- *
* @param originalString
* The input String
* @return
@@ -314,54 +312,38 @@ public class JObjectUtil {
int numberOfSchemaFields = recordType.getFieldTypes().length;
byte[] recordBits = dis.getInputStream().getArray();
boolean isExpanded = false;
- int s = dis.getInputStream().getPosition();
- int recordOffset = s;
- int openPartOffset = 0;
- int offsetArrayOffset = 0;
+ dis.getInputStream();
int[] fieldOffsets = new int[numberOfSchemaFields];
IJObject[] closedFields = new IJObject[numberOfSchemaFields];
- if (recordType == null) {
- openPartOffset = s + AInt32SerializerDeserializer.getInt(recordBits, s + 6);
- s += 8;
- isExpanded = true;
- } else {
- dis.skip(4); // reading length is not required.
- if (recordType.isOpen()) {
- isExpanded = dis.readBoolean();
- if (isExpanded) {
- openPartOffset = s + dis.readInt(); // AInt32SerializerDeserializer.getInt(recordBits, s + 6);
- } else {
- // do nothing s += 6;
- }
+ dis.skip(4); // reading length is not required.
+ if (recordType.isOpen()) {
+ isExpanded = dis.readBoolean();
+ if (isExpanded) {
+ dis.readInt();
} else {
- // do nothing s += 5;
}
+ } else {
}
if (numberOfSchemaFields > 0) {
- int numOfSchemaFields = dis.readInt(); //s += 4;
+ dis.readInt();
int nullBitMapOffset = 0;
boolean hasNullableFields = NonTaggedFormatUtil.hasNullableField(recordType);
if (hasNullableFields) {
- nullBitMapOffset = dis.getInputStream().getPosition();//s
- offsetArrayOffset = dis.getInputStream().getPosition() //s
- + (numberOfSchemaFields % 8 == 0 ? numberOfSchemaFields / 8
- : numberOfSchemaFields / 8 + 1);
+ nullBitMapOffset = dis.getInputStream().getPosition();
+ dis.getInputStream();
} else {
- offsetArrayOffset = dis.getInputStream().getPosition();
+ dis.getInputStream();
}
for (int i = 0; i < numberOfSchemaFields; i++) {
- fieldOffsets[i] = dis.readInt(); // AInt32SerializerDeserializer.getInt(recordBits, offsetArrayOffset) + recordOffset;
- // offsetArrayOffset += 4;
+ fieldOffsets[i] = dis.readInt();
}
for (int fieldNumber = 0; fieldNumber < numberOfSchemaFields; fieldNumber++) {
if (hasNullableFields) {
byte b1 = recordBits[nullBitMapOffset + fieldNumber / 8];
int p = 1 << (7 - (fieldNumber % 8));
if ((b1 & p) == 0) {
- // set null value (including type tag inside)
- //fieldValues.add(nullReference);
continue;
}
}
@@ -373,8 +355,6 @@ public class JObjectUtil {
if (((AUnionType) fieldTypes[fieldNumber]).isNullableType()) {
fieldType = ((AUnionType) fieldTypes[fieldNumber]).getNullableType();
fieldValueTypeTag = fieldType.getTypeTag();
- // fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(recordBits,
- // fieldOffsets[fieldNumber], typeTag, false);
}
} else {
fieldValueTypeTag = fieldTypes[fieldNumber].getTypeTag();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDataScanOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDataScanOperatorDescriptor.java
new file mode 100644
index 0000000..aed8bb9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDataScanOperatorDescriptor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/*
+ * A single activity operator that provides the functionality of scanning data using an
+ * instance of the configured adapter.
+ */
+public class ExternalDataScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private IAdapterFactory adapterFactory;
+
+ public ExternalDataScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
+ IAdapterFactory dataSourceAdapterFactory) {
+ super(spec, 0, 1);
+ recordDescriptors[0] = rDesc;
+ this.adapterFactory = dataSourceAdapterFactory;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ IDataSourceAdapter adapter = null;
+ try {
+ writer.open();
+ adapter = adapterFactory.createAdapter(ctx, partition);
+ adapter.start(partition, writer);
+ } catch (Throwable th) {
+ writer.fail();
+ throw new HyracksDataException(th);
+ } finally {
+ writer.close();
+ }
+ }
+ };
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
index 59ad076..82ca715 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.operators;
-import java.io.File;
import java.util.List;
import org.apache.hyracks.api.context.IHyracksTaskContext;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
new file mode 100644
index 0000000..a929eec
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * FeedCollectOperatorDescriptor is responsible for ingesting data from an external source. This
+ * operator uses a user specified for a built-in adaptor for retrieving data from the external
+ * data source.
+ */
+public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(FeedCollectOperatorDescriptor.class.getName());
+
+ /** The type associated with the ADM data output from the feed adaptor */
+ private final IAType outputType;
+
+ /** unique identifier for a feed instance. */
+ private final FeedConnectionId connectionId;
+
+ /** Map representation of policy parameters */
+ private final Map<String, String> feedPolicyProperties;
+
+ /** The (singleton) instance of {@code IFeedIngestionManager} **/
+ private IFeedSubscriptionManager subscriptionManager;
+
+ /** The source feed from which the feed derives its data from. **/
+ private final FeedId sourceFeedId;
+
+ /** The subscription location at which the recipient feed receives tuples from the source feed **/
+ private final ConnectionLocation subscriptionLocation;
+
+ public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, FeedId sourceFeedId,
+ ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
+ ConnectionLocation subscriptionLocation) {
+ super(spec, 0, 1);
+ recordDescriptors[0] = rDesc;
+ this.outputType = atype;
+ this.connectionId = feedConnectionId;
+ this.feedPolicyProperties = feedPolicyProperties;
+ this.sourceFeedId = sourceFeedId;
+ this.subscriptionLocation = subscriptionLocation;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ this.subscriptionManager = ((IFeedManager) runtimeCtx.getFeedManager()).getFeedSubscriptionManager();
+ ISubscribableRuntime sourceRuntime = null;
+ IOperatorNodePushable nodePushable = null;
+ switch (subscriptionLocation) {
+ case SOURCE_FEED_INTAKE_STAGE:
+ try {
+ SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+ FeedRuntimeType.INTAKE, partition);
+ sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
+ if (sourceRuntime == null) {
+ throw new HyracksDataException(
+ "Source intake task not found for source feed id " + sourceFeedId);
+ }
+ nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
+ feedPolicyProperties, partition, nPartitions, sourceRuntime);
+
+ } catch (Exception exception) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Initialization of the feed adaptor failed with exception " + exception);
+ }
+ throw new HyracksDataException("Initialization of the feed adapter failed", exception);
+ }
+ break;
+ case SOURCE_FEED_COMPUTE_STAGE:
+ SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+ FeedRuntimeType.COMPUTE, partition);
+ sourceRuntime = subscriptionManager.getSubscribableRuntime(feedSubscribableRuntimeId);
+ if (sourceRuntime == null) {
+ throw new HyracksDataException("Source compute task not found for source feed id " + sourceFeedId
+ + " " + FeedRuntimeType.COMPUTE + "[" + partition + "]");
+ }
+ nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
+ feedPolicyProperties, partition, nPartitions, sourceRuntime);
+ break;
+ }
+ return nodePushable;
+ }
+
+ public FeedConnectionId getFeedConnectionId() {
+ return connectionId;
+ }
+
+ public Map<String, String> getFeedPolicyProperties() {
+ return feedPolicyProperties;
+ }
+
+ public IAType getOutputType() {
+ return outputType;
+ }
+
+ public RecordDescriptor getRecordDescriptor() {
+ return recordDescriptors[0];
+ }
+
+ public FeedId getSourceFeedId() {
+ return sourceFeedId;
+ }
+
+ private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
+ int waitCycleCount = 0;
+ ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
+ while (ingestionRuntime == null && waitCycleCount < 10) {
+ try {
+ Thread.sleep(2000);
+ waitCycleCount++;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ break;
+ }
+ ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
+ }
+ return (IngestionRuntime) ingestionRuntime;
+ }
+
+ public ConnectionLocation getSubscriptionLocation() {
+ return subscriptionLocation;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
new file mode 100644
index 0000000..8916af6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.dataflow.CollectTransformFeedFrameWriter;
+import org.apache.asterix.external.feed.dataflow.FeedCollectRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.CollectionRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ * The runtime for @see{FeedIntakeOperationDescriptor}
+ */
+public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+
+ private static Logger LOGGER = Logger.getLogger(FeedCollectOperatorNodePushable.class.getName());
+
+ private final int partition;
+ private final FeedConnectionId connectionId;
+ private final Map<String, String> feedPolicy;
+ private final FeedPolicyAccessor policyAccessor;
+ private final IFeedManager feedManager;
+ private final ISubscribableRuntime sourceRuntime;
+ private final IHyracksTaskContext ctx;
+ private final int nPartitions;
+
+ private RecordDescriptor outputRecordDescriptor;
+ private FeedRuntimeInputHandler inputSideHandler;
+ private CollectionRuntime collectRuntime;
+
+ public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedId sourceFeedId,
+ FeedConnectionId feedConnectionId, Map<String, String> feedPolicy, int partition, int nPartitions,
+ ISubscribableRuntime sourceRuntime) {
+ this.ctx = ctx;
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.connectionId = feedConnectionId;
+ this.sourceRuntime = sourceRuntime;
+ this.feedPolicy = feedPolicy;
+ policyAccessor = new FeedPolicyAccessor(feedPolicy);
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ this.feedManager = (IFeedManager) runtimeCtx.getFeedManager();
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ outputRecordDescriptor = recordDesc;
+ FeedRuntimeType sourceRuntimeType = ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId())
+ .getFeedRuntimeType();
+ switch (sourceRuntimeType) {
+ case INTAKE:
+ handleCompleteConnection();
+ break;
+ case COMPUTE:
+ handlePartialConnection();
+ break;
+ default:
+ throw new IllegalStateException("Invalid source type " + sourceRuntimeType);
+ }
+
+ State state = collectRuntime.waitTillCollectionOver();
+ if (state.equals(State.FINISHED)) {
+ feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
+ collectRuntime.getRuntimeId());
+ writer.close();
+ inputSideHandler.close();
+ } else if (state.equals(State.HANDOVER)) {
+ inputSideHandler.setMode(Mode.STALL);
+ writer.close();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending Collect Operator, the input side handler is now in " + Mode.STALL
+ + " and the output writer " + writer + " has been closed ");
+ }
+ }
+ } catch (InterruptedException ie) {
+ handleInterruptedException(ie);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void handleCompleteConnection() throws Exception {
+ FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COLLECT, partition,
+ FeedRuntimeId.DEFAULT_OPERAND_ID);
+ collectRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(connectionId,
+ runtimeId);
+ if (collectRuntime == null) {
+ beginNewFeed(runtimeId);
+ } else {
+ reviveOldFeed();
+ }
+ }
+
+ private void beginNewFeed(FeedRuntimeId runtimeId) throws Exception {
+ writer.open();
+ IFrameWriter outputSideWriter = writer;
+ if (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType()
+ .equals(FeedRuntimeType.COMPUTE)) {
+ outputSideWriter = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime, outputRecordDescriptor,
+ connectionId);
+ this.recordDesc = sourceRuntime.getRecordDescriptor();
+ }
+
+ FrameTupleAccessor tupleAccessor = new FrameTupleAccessor(recordDesc);
+ inputSideHandler = new FeedCollectRuntimeInputHandler(ctx, connectionId, runtimeId, outputSideWriter,
+ policyAccessor, false, tupleAccessor, recordDesc, feedManager, nPartitions);
+
+ collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, outputSideWriter,
+ sourceRuntime, feedPolicy);
+ feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+ sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
+ }
+
+ private void reviveOldFeed() throws HyracksDataException {
+ writer.open();
+ collectRuntime.getFrameCollector().setState(State.ACTIVE);
+ inputSideHandler = collectRuntime.getInputHandler();
+
+ IFrameWriter innerWriter = inputSideHandler.getCoreOperator();
+ if (innerWriter instanceof CollectTransformFeedFrameWriter) {
+ ((CollectTransformFeedFrameWriter) innerWriter).reset(this.writer);
+ } else {
+ inputSideHandler.setCoreOperator(writer);
+ }
+
+ inputSideHandler.setMode(Mode.PROCESS);
+ }
+
+ private void handlePartialConnection() throws Exception {
+ FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COMPUTE_COLLECT, partition,
+ FeedRuntimeId.DEFAULT_OPERAND_ID);
+ writer.open();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Beginning new feed (from existing partial connection):" + connectionId);
+ }
+ IFeedOperatorOutputSideHandler wrapper = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime,
+ outputRecordDescriptor, connectionId);
+
+ inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, wrapper, policyAccessor, false,
+ new FrameTupleAccessor(recordDesc), recordDesc, feedManager, nPartitions);
+
+ collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, wrapper, sourceRuntime,
+ feedPolicy);
+ feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+ recordDesc = sourceRuntime.getRecordDescriptor();
+ sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
+ }
+
+ private void handleInterruptedException(InterruptedException ie) throws HyracksDataException {
+ if (policyAccessor.continueOnHardwareFailure()) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Continuing on failure as per feed policy, switching to " + Mode.STALL
+ + " until failure is resolved");
+ }
+ inputSideHandler.setMode(Mode.STALL);
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Failure during feed ingestion. Deregistering feed runtime " + collectRuntime
+ + " as feed is not configured to handle failures");
+ }
+ feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
+ writer.close();
+ throw new HyracksDataException(ie);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
new file mode 100644
index 0000000..a18ebcd
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.feed.api.IFeed;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * An operator responsible for establishing connection with external data source and parsing,
+ * translating the received content.It uses an instance of feed adaptor to perform these functions.
+ */
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
+
+ /** The unique identifier of the feed that is being ingested. **/
+ private final FeedId feedId;
+
+ private final FeedPolicyAccessor policyAccessor;
+
+ /** The adaptor factory that is used to create an instance of the feed adaptor **/
+ private IAdapterFactory adaptorFactory;
+
+ /** The library that contains the adapter in use. **/
+ private String adaptorLibraryName;
+
+ /**
+ * The adapter factory class that is used to create an instance of the feed adapter.
+ * This value is used only in the case of external adapters.
+ **/
+ private String adaptorFactoryClassName;
+
+ /** The configuration parameters associated with the adapter. **/
+ private Map<String, String> adaptorConfiguration;
+
+ private ARecordType adapterOutputType;
+
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, IAdapterFactory adapterFactory,
+ ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
+ super(spec, 0, 1);
+ this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+ this.adaptorFactory = adapterFactory;
+ this.adapterOutputType = adapterOutputType;
+ this.policyAccessor = policyAccessor;
+ }
+
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, String adapterLibraryName,
+ String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
+ super(spec, 0, 1);
+ this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+ this.adaptorFactoryClassName = adapterFactoryClassName;
+ this.adaptorLibraryName = adapterLibraryName;
+ this.adaptorConfiguration = primaryFeed.getAdapterConfiguration();
+ this.adapterOutputType = adapterOutputType;
+ this.policyAccessor = policyAccessor;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ IFeedSubscriptionManager feedSubscriptionManager = ((IFeedManager) runtimeCtx.getFeedManager())
+ .getFeedSubscriptionManager();
+ SubscribableFeedRuntimeId feedIngestionId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
+ partition);
+ IngestionRuntime ingestionRuntime = (IngestionRuntime) feedSubscriptionManager
+ .getSubscribableRuntime(feedIngestionId);
+ if (adaptorFactory == null) {
+ try {
+ adaptorFactory = createExtenralAdapterFactory(ctx, partition);
+ } catch (Exception exception) {
+ throw new HyracksDataException(exception);
+ }
+
+ }
+ return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, ingestionRuntime,
+ policyAccessor);
+ }
+
+ private IAdapterFactory createExtenralAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception {
+ IAdapterFactory adapterFactory = null;
+ ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
+ adaptorLibraryName);
+ if (classLoader != null) {
+ adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance()));
+ adapterFactory.configure(adaptorConfiguration, adapterOutputType);
+ } else {
+ String message = "Unable to create adapter as class loader not configured for library " + adaptorLibraryName
+ + " in dataverse " + feedId.getDataverse();
+ LOGGER.severe(message);
+ throw new IllegalArgumentException(message);
+ }
+ return adapterFactory;
+ }
+
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
new file mode 100644
index 0000000..b31f2bf
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
+import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
+import org.apache.asterix.external.feed.api.ISubscriberRuntime;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
+import org.apache.asterix.external.feed.runtime.CollectionRuntime;
+import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ * The runtime for @see{FeedIntakeOperationDescriptor}.
+ * Provides the core functionality to set up the artifacts for ingestion of a feed.
+ * The artifacts are lazily activated when a feed receives a subscription request.
+ */
+public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+
+ private static Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
+
+ private final FeedId feedId;
+ private final int partition;
+ private final IFeedSubscriptionManager feedSubscriptionManager;
+ private final IFeedManager feedManager;
+ private final IHyracksTaskContext ctx;
+ private final IAdapterFactory adapterFactory;
+
+ private IngestionRuntime ingestionRuntime;
+ private IFeedAdapter adapter;
+ private IIntakeProgressTracker tracker;
+ private DistributeFeedFrameWriter feedFrameWriter;
+
+ public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IAdapterFactory adapterFactory,
+ int partition, IngestionRuntime ingestionRuntime, FeedPolicyAccessor policyAccessor) {
+ this.ctx = ctx;
+ this.feedId = feedId;
+ this.partition = partition;
+ this.ingestionRuntime = ingestionRuntime;
+ this.adapterFactory = adapterFactory;
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ this.feedManager = (IFeedManager) runtimeCtx.getFeedManager();
+ this.feedSubscriptionManager = feedManager.getFeedSubscriptionManager();
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ IAdapterRuntimeManager adapterRuntimeManager = null;
+ try {
+ if (ingestionRuntime == null) {
+ try {
+ adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+ //TODO: Fix record tracking
+ // if (adapterFactory.isRecordTrackingEnabled()) {
+ // tracker = adapterFactory.createIntakeProgressTracker();
+ // }
+ } catch (Exception e) {
+ LOGGER.severe("Unable to create adapter : " + adapterFactory.getAlias() + "[" + partition + "]"
+ + " Exception " + e);
+ throw new HyracksDataException(e);
+ }
+ FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc);
+ feedFrameWriter = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition,
+ fta, feedManager);
+ adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, tracker, feedFrameWriter, partition);
+ SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
+ partition);
+ ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
+ adapterRuntimeManager);
+ feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
+ feedFrameWriter.open();
+ } else {
+ if (ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
+ ingestionRuntime.getAdapterRuntimeManager().setState(State.ACTIVE_INGESTION);
+ adapter = ingestionRuntime.getAdapterRuntimeManager().getFeedAdapter();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(" Switching to " + State.ACTIVE_INGESTION + " for ingestion runtime "
+ + ingestionRuntime);
+ LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
+ + " connected to backend for feed " + feedId);
+ }
+ feedFrameWriter = ingestionRuntime.getFeedFrameWriter();
+ } else {
+ String message = "Feed Ingestion Runtime for feed " + feedId
+ + " is already registered and is active!.";
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+ }
+
+ waitTillIngestionIsOver(adapterRuntimeManager);
+ feedSubscriptionManager
+ .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
+ if (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)) {
+ throw new HyracksDataException("Unable to ingest data");
+ }
+
+ } catch (InterruptedException ie) {
+ /*
+ * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.
+ * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
+ * The surviving intake partitions must continue to live and receive data from the external source.
+ */
+ List<ISubscriberRuntime> subscribers = ingestionRuntime.getSubscribers();
+ FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(new HashMap<String, String>());
+ boolean needToHandleFailure = false;
+ List<ISubscriberRuntime> failingSubscribers = new ArrayList<ISubscriberRuntime>();
+ for (ISubscriberRuntime subscriber : subscribers) {
+ policyAccessor.reset(subscriber.getFeedPolicy());
+ if (!policyAccessor.continueOnHardwareFailure()) {
+ failingSubscribers.add(subscriber);
+ } else {
+ needToHandleFailure = true;
+ }
+ }
+
+ for (ISubscriberRuntime failingSubscriber : failingSubscribers) {
+ try {
+ ingestionRuntime.unsubscribeFeed((CollectionRuntime) failingSubscriber);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(
+ "Excpetion in unsubscribing " + failingSubscriber + " message " + e.getMessage());
+ }
+ }
+ }
+
+ if (needToHandleFailure) {
+ ingestionRuntime.getAdapterRuntimeManager().setState(State.INACTIVE_INGESTION);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Switching to " + State.INACTIVE_INGESTION + " on occurrence of failure.");
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(
+ "Interrupted Exception. None of the subscribers need to handle failures. Shutting down feed ingestion");
+ }
+ feedSubscriptionManager
+ .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
+ throw new HyracksDataException(ie);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ } finally {
+ if (ingestionRuntime != null
+ && !ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
+ feedFrameWriter.close();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Closed Frame Writer " + feedFrameWriter + " adapter state "
+ + ingestionRuntime.getAdapterRuntimeManager().getState());
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending intake operator node pushable in state " + State.INACTIVE_INGESTION
+ + " Will resume after correcting failure");
+ }
+ }
+
+ }
+ }
+
+ private void waitTillIngestionIsOver(IAdapterRuntimeManager adapterRuntimeManager) throws InterruptedException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Waiting for adaptor [" + partition + "]" + "to be done with ingestion of feed " + feedId);
+ }
+ synchronized (adapterRuntimeManager) {
+ while (!(adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FINISHED_INGESTION)
+ || (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)))) {
+ adapterRuntimeManager.wait();
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
+ + " done with ingestion of feed " + feedId);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
new file mode 100644
index 0000000..219110f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * Sends a control message to the registered message queue for feed specified by its feedId.
+ */
+public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+ private final IFeedMessage feedMessage;
+
+ public FeedMessageOperatorDescriptor(JobSpecification spec, FeedConnectionId connectionId,
+ IFeedMessage feedMessage) {
+ super(spec, 0, 1);
+ this.connectionId = connectionId;
+ this.feedMessage = feedMessage;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition, nPartitions);
+ }
+
+}