You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/30 00:52:15 UTC
[2/5] incubator-asterixdb git commit: Add flush() to IFrameWriter
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
new file mode 100644
index 0000000..f38c2cb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.twitter;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.TwitterUtil;
+import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
+import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import twitter4j.Status;
+
+public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName());
+
+ private static final String DEFAULT_INTERVAL = "10"; // 10 seconds
+ private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
+
+ private Map<String, String> configuration;
+ private boolean pull;
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY);
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+ if (!validateConfiguration(configuration)) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("One or more parameters are missing from adapter configuration\n");
+ builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
+ builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
+ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
+ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
+ throw new Exception(builder.toString());
+ }
+ if (ExternalDataUtils.isPull(configuration)) {
+ pull = true;
+ if (configuration.get(SearchAPIConstants.QUERY) == null) {
+ throw new AsterixException(
+ "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
+ }
+ String interval = configuration.get(SearchAPIConstants.INTERVAL);
+ if (interval != null) {
+ try {
+ Integer.parseInt(interval);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException(
+ "parameter " + SearchAPIConstants.INTERVAL + " is defined incorrectly, expecting a number");
+ }
+ } else {
+ configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
+ + DEFAULT_INTERVAL + ")");
+ }
+ }
+ } else if (ExternalDataUtils.isPush(configuration)) {
+ pull = false;
+ } else {
+ throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
+ + ExternalDataConstants.KEY_PUSH + " must be specified as part of adaptor configuration");
+ }
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+ IRecordReader<Status> reader;
+ if (pull) {
+ reader = new TwitterPullRecordReader();
+ } else {
+ reader = new TwitterPushRecordReader();
+ }
+ reader.configure(configuration);
+ return reader;
+ }
+
+ @Override
+ public Class<? extends Status> getRecordClass() {
+ return Status.class;
+ }
+
+ private boolean validateConfiguration(Map<String, String> configuration) {
+ String consumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+ String consumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+ String accessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+ String tokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+ if (consumerKey == null || consumerSecret == null || accessToken == null || tokenSecret == null) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
index 73f6195..ce65249 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
@@ -20,9 +20,14 @@ package org.apache.asterix.external.input.stream;
import java.io.InputStream;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+
public abstract class AInputStream extends InputStream {
public abstract boolean skipError() throws Exception;
public abstract boolean stop() throws Exception;
+ public void setController(AbstractFeedDataFlowController controller) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index 7ba6032..25418b0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -21,6 +21,8 @@ package org.apache.asterix.external.input.stream;
import java.io.IOException;
import java.io.InputStreamReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+
public class AInputStreamReader extends InputStreamReader {
private AInputStream in;
@@ -40,4 +42,8 @@ public class AInputStreamReader extends InputStreamReader {
throw new IOException(e);
}
}
+
+ public void setController(AbstractFeedDataFlowController controller) {
+ in.setController(controller);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
deleted file mode 100644
index 8f4c094..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
-import org.apache.asterix.external.provider.ExternalIndexerProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implements IInputStreamProvider {
-
- public HDFSInputStreamProvider(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
- JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot) throws Exception {
- super(read, inputSplits, readSchedule, nodeName, conf);
- value = new Text();
- configure(configuration);
- if (snapshot != null) {
- setSnapshot(snapshot);
- setIndexer(ExternalIndexerProvider.getIndexer(configuration));
- if (currentSplitIndex < snapshot.size()) {
- indexer.reset(this);
- }
- }
- }
-
- @Override
- public AInputStream getInputStream() throws Exception {
- return new HDFSInputStream();
- }
-
- private class HDFSInputStream extends AInputStream {
- int pos = 0;
-
- @Override
- public int read() throws IOException {
- if (value.getLength() < pos) {
- if (!readMore()) {
- return -1;
- }
- } else if (value.getLength() == pos) {
- pos++;
- return ExternalDataConstants.BYTE_LF;
- }
- return value.getBytes()[pos++];
- }
-
- private int readRecord(byte[] buffer, int offset, int len) {
- int actualLength = value.getLength() + 1;
- if ((actualLength - pos) > len) {
- //copy partial record
- System.arraycopy(value.getBytes(), pos, buffer, offset, len);
- pos += len;
- return len;
- } else {
- int numBytes = value.getLength() - pos;
- System.arraycopy(value.getBytes(), pos, buffer, offset, numBytes);
- buffer[offset + numBytes] = ExternalDataConstants.LF;
- pos += numBytes;
- numBytes++;
- return numBytes;
- }
- }
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- if (value.getLength() > pos) {
- return readRecord(buffer, offset, len);
- }
- if (!readMore()) {
- return -1;
- }
- return readRecord(buffer, offset, len);
- }
-
- private boolean readMore() throws IOException {
- try {
- pos = 0;
- return HDFSInputStreamProvider.this.hasNext();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public boolean skipError() throws Exception {
- return true;
- }
-
- @Override
- public boolean stop() throws Exception {
- return false;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
deleted file mode 100644
index 22d0a87..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.FeedUtils;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-
-public class LocalFSInputStreamProvider implements IInputStreamProvider {
-
- private String expression;
- private boolean isFeed;
- private Path path;
- private File feedLogFile;
-
- public LocalFSInputStreamProvider(FileSplit[] fileSplits, IHyracksTaskContext ctx,
- Map<String, String> configuration, int partition, String expression, boolean isFeed,
- FileSplit[] feedLogFileSplits) {
- this.expression = expression;
- this.isFeed = isFeed;
- this.path = fileSplits[partition].getLocalFile().getFile().toPath();
- if (feedLogFileSplits != null) {
- this.feedLogFile = FeedUtils
- .getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(),
- feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager())
- .getFile();
-
- }
- }
-
- @Override
- public AInputStream getInputStream() throws IOException {
- FeedLogManager feedLogManager = null;
- if (isFeed && feedLogFile != null) {
- feedLogManager = new FeedLogManager(feedLogFile);
- }
- return new LocalFileSystemInputStream(path, expression, feedLogManager, isFeed);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
index 7eebe4c..7b7cd8b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
@@ -22,6 +22,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Path;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FileSystemWatcher;
@@ -39,6 +40,11 @@ public class LocalFileSystemInputStream extends AInputStream {
}
@Override
+ public void setController(AbstractFeedDataFlowController controller) {
+ watcher.setController(controller);
+ }
+
+ @Override
public void close() throws IOException {
IOException ioe = null;
if (in != null) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
deleted file mode 100644
index 1f920e9..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.net.ServerSocket;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-
-public class SocketInputStreamProvider implements IInputStreamProvider {
- private ServerSocket server;
-
- public SocketInputStreamProvider(ServerSocket server) {
- this.server = server;
- }
-
- @Override
- public AInputStream getInputStream() throws Exception {
- return new SocketInputStream(server);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
deleted file mode 100644
index 7c64aa3..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.util.TweetGenerator;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {
-
- private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStreamProvider.class.getName());
-
- private ExecutorService executorService;
-
- private PipedOutputStream outputStream;
-
- private PipedInputStream inputStream;
-
- private TwitterServer twitterServer;
-
- public TwitterFirehoseInputStreamProvider(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
- throws Exception {
- executorService = Executors.newCachedThreadPool();
- outputStream = new PipedOutputStream();
- inputStream = new PipedInputStream(outputStream);
- twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
- }
-
- @Override
- public AInputStream getInputStream() throws Exception {
- twitterServer.start();
- return twitterServer;
- }
-
- private static class TwitterServer extends AInputStream {
- private final DataProvider dataProvider;
- private final ExecutorService executorService;
- private InputStream in;
- private boolean started;
-
- public TwitterServer(Map<String, String> configuration, int partition, OutputStream os,
- ExecutorService executorService, InputStream in) throws Exception {
- dataProvider = new DataProvider(configuration, partition, os);
- this.executorService = executorService;
- this.in = in;
- this.started = false;
- }
-
- @Override
- public boolean stop() throws IOException {
- dataProvider.stop();
- return true;
- }
-
- public void start() {
- executorService.execute(dataProvider);
- }
-
- @Override
- public boolean skipError() throws Exception {
- return false;
- }
-
- @Override
- public int read() throws IOException {
- if (!started) {
- start();
- started = true;
- }
- return in.read();
- }
-
- @Override
- public int read(byte b[], int off, int len) throws IOException {
- if (!started) {
- start();
- started = true;
- }
- return in.read(b, off, len);
- }
- }
-
- private static class DataProvider implements Runnable {
-
- public static final String KEY_MODE = "mode";
-
- private TweetGenerator tweetGenerator;
- private boolean continuePush = true;
- private int batchSize;
- private final Mode mode;
- private final OutputStream os;
-
- public static enum Mode {
- AGGRESSIVE,
- CONTROLLED
- }
-
- public DataProvider(Map<String, String> configuration, int partition, OutputStream os) throws Exception {
- this.tweetGenerator = new TweetGenerator(configuration, partition);
- this.tweetGenerator.registerSubscriber(os);
- this.os = os;
- mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
- : Mode.AGGRESSIVE;
- switch (mode) {
- case CONTROLLED:
- String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
- if (tpsValue == null) {
- throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
- }
- batchSize = Integer.parseInt(tpsValue);
- break;
- case AGGRESSIVE:
- batchSize = 5000;
- break;
- }
- }
-
- @Override
- public void run() {
- boolean moreData = true;
- long startBatch;
- long endBatch;
-
- while (true) {
- try {
- while (moreData && continuePush) {
- switch (mode) {
- case AGGRESSIVE:
- moreData = tweetGenerator.generateNextBatch(batchSize);
- break;
- case CONTROLLED:
- startBatch = System.currentTimeMillis();
- moreData = tweetGenerator.generateNextBatch(batchSize);
- endBatch = System.currentTimeMillis();
- if (endBatch - startBatch < 1000) {
- Thread.sleep(1000 - (endBatch - startBatch));
- }
- break;
- }
- }
- os.close();
- break;
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in adaptor " + e.getMessage());
- }
- }
- }
- }
-
- public void stop() {
- continuePush = false;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index ab1f8a0..06833af 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -28,7 +28,7 @@ import org.apache.asterix.external.api.IInputStreamProvider;
import org.apache.asterix.external.api.IInputStreamProviderFactory;
import org.apache.asterix.external.api.INodeResolver;
import org.apache.asterix.external.api.INodeResolverFactory;
-import org.apache.asterix.external.input.stream.LocalFSInputStreamProvider;
+import org.apache.asterix.external.input.stream.provider.LocalFSInputStreamProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedUtils;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
index 37afa53..ea60f43 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
@@ -30,7 +30,7 @@ import java.util.Set;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.IInputStreamProvider;
import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.SocketInputStreamProvider;
+import org.apache.asterix.external.input.stream.provider.SocketInputStreamProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.util.AsterixRuntimeUtil;
import org.apache.commons.lang3.StringUtils;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
new file mode 100644
index 0000000..93a1685
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.provider.ExternalIndexerProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implements IInputStreamProvider {
+
+ public HDFSInputStreamProvider(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
+ JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot) throws Exception {
+ super(read, inputSplits, readSchedule, nodeName, conf);
+ value = new Text();
+ configure(configuration);
+ if (snapshot != null) {
+ setSnapshot(snapshot);
+ setIndexer(ExternalIndexerProvider.getIndexer(configuration));
+ if (currentSplitIndex < snapshot.size()) {
+ indexer.reset(this);
+ }
+ }
+ }
+
+ @Override
+ public AInputStream getInputStream() throws Exception {
+ return new HDFSInputStream();
+ }
+
+ private class HDFSInputStream extends AInputStream {
+ int pos = 0;
+
+ @Override
+ public int read() throws IOException {
+ if (value.getLength() < pos) {
+ if (!readMore()) {
+ return -1;
+ }
+ } else if (value.getLength() == pos) {
+ pos++;
+ return ExternalDataConstants.BYTE_LF;
+ }
+ return value.getBytes()[pos++];
+ }
+
+ private int readRecord(byte[] buffer, int offset, int len) {
+ int actualLength = value.getLength() + 1;
+ if ((actualLength - pos) > len) {
+ //copy partial record
+ System.arraycopy(value.getBytes(), pos, buffer, offset, len);
+ pos += len;
+ return len;
+ } else {
+ int numBytes = value.getLength() - pos;
+ System.arraycopy(value.getBytes(), pos, buffer, offset, numBytes);
+ buffer[offset + numBytes] = ExternalDataConstants.LF;
+ pos += numBytes;
+ numBytes++;
+ return numBytes;
+ }
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (value.getLength() > pos) {
+ return readRecord(buffer, offset, len);
+ }
+ if (!readMore()) {
+ return -1;
+ }
+ return readRecord(buffer, offset, len);
+ }
+
+ private boolean readMore() throws IOException {
+ try {
+ pos = 0;
+ return HDFSInputStreamProvider.this.hasNext();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean skipError() throws Exception {
+ return true;
+ }
+
+ @Override
+ public boolean stop() throws Exception {
+ return false;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
new file mode 100644
index 0000000..4c4edd3
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.LocalFileSystemInputStream;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class LocalFSInputStreamProvider implements IInputStreamProvider {
+
+ private String expression;
+ private boolean isFeed;
+ private Path path;
+ private File feedLogFile;
+
+ public LocalFSInputStreamProvider(FileSplit[] fileSplits, IHyracksTaskContext ctx,
+ Map<String, String> configuration, int partition, String expression, boolean isFeed,
+ FileSplit[] feedLogFileSplits) {
+ this.expression = expression;
+ this.isFeed = isFeed;
+ this.path = fileSplits[partition].getLocalFile().getFile().toPath();
+ if (feedLogFileSplits != null) {
+ this.feedLogFile = FeedUtils
+ .getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(),
+ feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager())
+ .getFile();
+
+ }
+ }
+
+ @Override
+ public AInputStream getInputStream() throws IOException {
+ FeedLogManager feedLogManager = null;
+ if (isFeed && feedLogFile != null) {
+ feedLogManager = new FeedLogManager(feedLogFile);
+ }
+ return new LocalFileSystemInputStream(path, expression, feedLogManager, isFeed);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
new file mode 100644
index 0000000..2b12675
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.net.ServerSocket;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.SocketInputStream;
+
+public class SocketInputStreamProvider implements IInputStreamProvider {
+ private ServerSocket server;
+
+ public SocketInputStreamProvider(ServerSocket server) {
+ this.server = server;
+ }
+
+ @Override
+ public AInputStream getInputStream() throws Exception {
+ return new SocketInputStream(server);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
new file mode 100644
index 0000000..06f7e72
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.util.TweetGenerator;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {
+
+ private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStreamProvider.class.getName());
+
+ private ExecutorService executorService;
+
+ private PipedOutputStream outputStream;
+
+ private PipedInputStream inputStream;
+
+ private TwitterServer twitterServer;
+
+ public TwitterFirehoseInputStreamProvider(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
+ throws Exception {
+ executorService = Executors.newCachedThreadPool();
+ outputStream = new PipedOutputStream();
+ inputStream = new PipedInputStream(outputStream);
+ twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
+ }
+
+ @Override
+ public AInputStream getInputStream() throws Exception {
+ twitterServer.start();
+ return twitterServer;
+ }
+
+ private static class TwitterServer extends AInputStream {
+ private final DataProvider dataProvider;
+ private final ExecutorService executorService;
+ private InputStream in;
+ private boolean started;
+
+ public TwitterServer(Map<String, String> configuration, int partition, OutputStream os,
+ ExecutorService executorService, InputStream in) throws Exception {
+ dataProvider = new DataProvider(configuration, partition, os);
+ this.executorService = executorService;
+ this.in = in;
+ this.started = false;
+ }
+
+ @Override
+ public boolean stop() throws IOException {
+ dataProvider.stop();
+ return true;
+ }
+
+ public void start() {
+ executorService.execute(dataProvider);
+ }
+
+ @Override
+ public boolean skipError() throws Exception {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!started) {
+ start();
+ started = true;
+ }
+ return in.read();
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ if (!started) {
+ start();
+ started = true;
+ }
+ return in.read(b, off, len);
+ }
+ }
+
+ private static class DataProvider implements Runnable {
+
+ public static final String KEY_MODE = "mode";
+
+ private TweetGenerator tweetGenerator;
+ private boolean continuePush = true;
+ private int batchSize;
+ private final Mode mode;
+ private final OutputStream os;
+
+ public static enum Mode {
+ AGGRESSIVE,
+ CONTROLLED
+ }
+
+ public DataProvider(Map<String, String> configuration, int partition, OutputStream os) throws Exception {
+ this.tweetGenerator = new TweetGenerator(configuration, partition);
+ this.tweetGenerator.registerSubscriber(os);
+ this.os = os;
+ mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
+ : Mode.AGGRESSIVE;
+ switch (mode) {
+ case CONTROLLED:
+ String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
+ if (tpsValue == null) {
+ throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
+ }
+ batchSize = Integer.parseInt(tpsValue);
+ break;
+ case AGGRESSIVE:
+ batchSize = 5000;
+ break;
+ }
+ }
+
+ @Override
+ public void run() {
+ boolean moreData = true;
+ long startBatch;
+ long endBatch;
+
+ while (true) {
+ try {
+ while (moreData && continuePush) {
+ switch (mode) {
+ case AGGRESSIVE:
+ moreData = tweetGenerator.generateNextBatch(batchSize);
+ break;
+ case CONTROLLED:
+ startBatch = System.currentTimeMillis();
+ moreData = tweetGenerator.generateNextBatch(batchSize);
+ endBatch = System.currentTimeMillis();
+ if (endBatch - startBatch < 1000) {
+ Thread.sleep(1000 - (endBatch - startBatch));
+ }
+ break;
+ }
+ }
+ os.close();
+ break;
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in adaptor " + e.getMessage());
+ }
+ }
+ }
+ }
+
+ public void stop() {
+ continuePush = false;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index 9e35617..2cab3a7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -80,7 +80,7 @@ public class ExternalLookupOperatorDescriptor extends AbstractTreeIndexOperatorD
try {
adapter = adapterFactory.createAdapter(ctx, partition,
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), snapshotAccessor, writer);
- //Open the file index accessor here
+ // Open the file index accessor here
snapshotAccessor.open();
indexOpen = true;
adapter.open();
@@ -127,6 +127,11 @@ public class ExternalLookupOperatorDescriptor extends AbstractTreeIndexOperatorD
throw new HyracksDataException(th);
}
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ adapter.flush();
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 80a54be..fa2e513 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -25,9 +25,9 @@ import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -183,6 +183,9 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
inputSideHandler.nextFrame(null); // signal end of data
while (!inputSideHandler.isFinished()) {
synchronized (coreOperator) {
+ if (inputSideHandler.isFinished()) {
+ break;
+ }
coreOperator.wait();
}
}
@@ -192,8 +195,8 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
}
coreOperator.close();
System.out.println("CLOSED " + coreOperator + " STALLED ?" + stalled + " ENDED " + end);
- } catch (Exception e) {
- e.printStackTrace();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
} finally {
if (!stalled) {
deregister();
@@ -221,4 +224,9 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
}
}
+ @Override
+ public void flush() throws HyracksDataException {
+ inputSideHandler.flush();
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
index 4dae72d..b09504a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
@@ -181,4 +181,9 @@ public class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorN
}
}
+ @Override
+ public void flush() throws HyracksDataException {
+ inputSideHandler.flush();
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index f75b3eb..3c4c9ad 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -188,6 +188,9 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
inputSideHandler.nextFrame(null); // signal end of data
while (!inputSideHandler.isFinished()) {
synchronized (coreOperator) {
+ if (inputSideHandler.isFinished()) {
+ break;
+ }
coreOperator.wait();
}
}
@@ -195,8 +198,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
}
coreOperator.close();
} catch (Exception e) {
- e.printStackTrace();
- // ignore
+ throw new HyracksDataException(e);
} finally {
if (!stalled) {
deregister();
@@ -217,4 +219,9 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
}
}
+ @Override
+ public void flush() throws HyracksDataException {
+ inputSideHandler.flush();
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 129b62f..14a3e2a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -39,7 +39,6 @@ import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.base.ANull;
@@ -67,7 +66,6 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
private AdmLexer admLexer;
private ARecordType recordType;
- private boolean datasetRec;
private boolean isStreamParser = true;
private int nullableFieldId = 0;
@@ -142,7 +140,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
public boolean parse(DataOutput out) throws AsterixException {
try {
resetPools();
- return parseAdmInstance(recordType, datasetRec, out);
+ return parseAdmInstance(recordType, out);
} catch (IOException e) {
throw new ParseException(e, filename, admLexer.getLine(), admLexer.getColumn());
} catch (AdmLexerException e) {
@@ -163,12 +161,6 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
public void configure(Map<String, String> configuration, ARecordType recordType) throws IOException {
this.recordType = recordType;
this.configuration = configuration;
- String isDatasetRecordString = configuration.get(ExternalDataConstants.KEY_DATASET_RECORD);
- if (isDatasetRecordString == null) {
- this.datasetRec = true;
- } else {
- this.datasetRec = Boolean.parseBoolean(isDatasetRecordString);
- }
this.isStreamParser = ExternalDataUtils.isDataSourceStreamProvider(configuration);
if (!isStreamParser) {
this.admLexer = new AdmLexer();
@@ -180,7 +172,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
try {
resetPools();
admLexer.setBuffer(record.get());
- parseAdmInstance(recordType, datasetRec, out);
+ parseAdmInstance(recordType, out);
} catch (IOException e) {
throw new ParseException(e, filename, admLexer.getLine(), admLexer.getColumn());
} catch (AdmLexerException e) {
@@ -201,18 +193,18 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
admLexer = new AdmLexer(new java.io.InputStreamReader(in));
}
- protected boolean parseAdmInstance(IAType objectType, boolean datasetRec, DataOutput out)
+ protected boolean parseAdmInstance(IAType objectType, DataOutput out)
throws AsterixException, IOException, AdmLexerException {
int token = admLexer.next();
if (token == AdmLexer.TOKEN_EOF) {
return false;
} else {
- admFromLexerStream(token, objectType, out, datasetRec);
+ admFromLexerStream(token, objectType, out);
return true;
}
}
- private void admFromLexerStream(int token, IAType objectType, DataOutput out, Boolean datasetRec)
+ private void admFromLexerStream(int token, IAType objectType, DataOutput out)
throws AsterixException, IOException, AdmLexerException {
switch (token) {
@@ -441,7 +433,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
case AdmLexer.TOKEN_START_RECORD: {
if (checkType(ATypeTag.RECORD, objectType)) {
objectType = getComplexType(objectType, ATypeTag.RECORD);
- parseRecord((ARecordType) objectType, out, datasetRec);
+ parseRecord((ARecordType) objectType, out);
} else {
throw new ParseException(mismatchErrorMessage + objectType.getTypeTag());
}
@@ -567,7 +559,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
return getTargetTypeTag(expectedTypeTag, aObjectType) != null;
}
- private void parseRecord(ARecordType recType, DataOutput out, Boolean datasetRec)
+ private void parseRecord(ARecordType recType, DataOutput out)
throws IOException, AsterixException, AdmLexerException {
ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
@@ -575,16 +567,8 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
IARecordBuilder recBuilder = getRecordBuilder();
BitSet nulls = null;
- if (datasetRec) {
-
- if (recType != null) {
- nulls = new BitSet(recType.getFieldNames().length);
- recBuilder.reset(recType);
- } else {
- recBuilder.reset(null);
- }
-
- } else if (recType != null) {
+ if (recType != null) {
+ //TODO: use BitSet Pool
nulls = new BitSet(recType.getFieldNames().length);
recBuilder.reset(recType);
} else {
@@ -650,7 +634,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
}
token = admLexer.next();
- this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput(), false);
+ this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput());
if (openRecordField) {
if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
@@ -752,7 +736,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
expectingListItem = false;
itemBuffer.reset();
- admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
+ admFromLexerStream(token, itemType, itemBuffer.getDataOutput());
orderedListBuilder.addItem(itemBuffer);
}
first = false;
@@ -799,7 +783,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
} else {
expectingListItem = false;
itemBuffer.reset();
- admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
+ admFromLexerStream(token, itemType, itemBuffer.getDataOutput());
unorderedListBuilder.addItem(itemBuffer);
}
first = false;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 6c399c3..2f0fc86 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -28,6 +28,7 @@ import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.ANullSerializerDeserializer;
+import org.apache.asterix.external.api.IDataParser;
import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
@@ -124,18 +125,6 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
}
}
- protected void fieldNameToBytes(String fieldName, AMutableString str, ArrayBackedValueStorage buffer)
- throws HyracksDataException {
- buffer.reset();
- DataOutput out = buffer.getDataOutput();
- str.setValue(fieldName);
- try {
- stringSerde.serialize(str, out);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
@Override
public DataSourceType getDataSourceType() {
return isStreamParser ? DataSourceType.STREAM : DataSourceType.RECORDS;
@@ -173,7 +162,8 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
throw new HyracksDataException("Illegal field " + name + " in closed type " + recordType);
} else {
nameBuffers[i] = new ArrayBackedValueStorage();
- fieldNameToBytes(name, str, nameBuffers[i]);
+ str.setValue(name);
+ IDataParser.toBytes(str, nameBuffers[i], stringSerde);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
new file mode 100644
index 0000000..ecdb03d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class RecordWithMetadataParser<T> implements IRecordDataParser<RecordWithMetadata<T>> {
+
+ private final Class<? extends RecordWithMetadata<T>> clazz;
+ private final int[] metaIndexes;
+ private final int valueIndex;
+ private ARecordType recordType;
+ private IRecordDataParser<T> valueParser;
+ private RecordBuilder recBuilder;
+ private ArrayBackedValueStorage[] nameBuffers;
+ private int numberOfFields;
+ private ArrayBackedValueStorage valueBuffer = new ArrayBackedValueStorage();
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+
+ public RecordWithMetadataParser(Class<? extends RecordWithMetadata<T>> clazz, int[] metaIndexes,
+ IRecordDataParser<T> valueParser, int valueIndex) {
+ this.clazz = clazz;
+ this.metaIndexes = metaIndexes;
+ this.valueParser = valueParser;
+ this.valueIndex = valueIndex;
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType recordType)
+ throws HyracksDataException, IOException {
+ this.recordType = recordType;
+ this.numberOfFields = recordType.getFieldNames().length;
+ recBuilder = new RecordBuilder();
+ recBuilder.reset(recordType);
+ recBuilder.init();
+ nameBuffers = new ArrayBackedValueStorage[numberOfFields];
+ AMutableString str = new AMutableString(null);
+ for (int i = 0; i < numberOfFields; i++) {
+ String name = recordType.getFieldNames()[i];
+ nameBuffers[i] = new ArrayBackedValueStorage();
+ str.setValue(name);
+ IDataParser.toBytes(str, nameBuffers[i], stringSerde);
+ }
+ }
+
+ @Override
+ public Class<? extends RecordWithMetadata<T>> getRecordClass() {
+ return clazz;
+ }
+
+ @Override
+ public void parse(IRawRecord<? extends RecordWithMetadata<T>> record, DataOutput out) throws Exception {
+ recBuilder.reset(recordType);
+ valueBuffer.reset();
+ recBuilder.init();
+ RecordWithMetadata<T> rwm = record.get();
+ for (int i = 0; i < numberOfFields; i++) {
+ if (i == valueIndex) {
+ valueParser.parse(rwm.getRecord(), valueBuffer.getDataOutput());
+ recBuilder.addField(i, valueBuffer);
+ } else {
+ recBuilder.addField(i, rwm.getMetadata(metaIndexes[i]));
+ }
+ }
+ recBuilder.write(out, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
new file mode 100644
index 0000000..88a0683
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.provider.ParserFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class RecordWithMetadataParserFactory<T> implements IRecordDataParserFactory<RecordWithMetadata<T>> {
+
+ private static final long serialVersionUID = 1L;
+ private Class<? extends RecordWithMetadata<T>> recordClass;
+ private ARecordType recordType;
+ private int[] metaIndexes;
+ private IRecordDataParserFactory<T> valueParserFactory;
+ private int valueIndex;
+
+ @Override
+ public DataSourceType getDataSourceType() throws AsterixException {
+ return DataSourceType.RECORDS;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ // validation first
+ if (!configuration.containsKey(ExternalDataConstants.KEY_META_INDEXES)) {
+ throw new HyracksDataException(
+ "the parser parameter (" + ExternalDataConstants.KEY_META_INDEXES + ") is missing");
+ }
+ if (!configuration.containsKey(ExternalDataConstants.KEY_VALUE_INDEX)) {
+ throw new HyracksDataException(
+ "the parser parameter (" + ExternalDataConstants.KEY_VALUE_INDEX + ") is missing");
+ }
+ if (!configuration.containsKey(ExternalDataConstants.KEY_VALUE_FORMAT)) {
+ throw new HyracksDataException(
+ "the parser parameter (" + ExternalDataConstants.KEY_VALUE_FORMAT + ") is missing");
+ }
+ // get meta field indexes
+ String[] stringMetaIndexes = configuration.get(ExternalDataConstants.KEY_META_INDEXES).split(",");
+ metaIndexes = new int[stringMetaIndexes.length];
+ for (int i = 0; i < stringMetaIndexes.length; i++) {
+ metaIndexes[i] = Integer.parseInt(stringMetaIndexes[i].trim());
+ }
+ // get value index
+ valueIndex = Integer.parseInt(configuration.get(ExternalDataConstants.KEY_VALUE_INDEX).trim());
+ // get value format
+ configuration.put(ExternalDataConstants.KEY_DATA_PARSER,
+ configuration.get(ExternalDataConstants.KEY_VALUE_FORMAT));
+ valueParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(configuration);
+ valueParserFactory.setRecordType((ARecordType) recordType.getFieldTypes()[valueIndex]);
+ valueParserFactory.configure(configuration);
+ recordClass = (Class<? extends RecordWithMetadata<T>>) (new RecordWithMetadata<T>(
+ valueParserFactory.getRecordClass())).getClass();
+ }
+
+ @Override
+ public void setRecordType(ARecordType recordType) {
+ this.recordType = recordType;
+ }
+
+ @Override
+ public IRecordDataParser<RecordWithMetadata<T>> createRecordParser(IHyracksTaskContext ctx)
+ throws HyracksDataException, AsterixException, IOException {
+ IRecordDataParser<T> valueParser = valueParserFactory.createRecordParser(ctx);
+ return new RecordWithMetadataParser<T>(recordClass, metaIndexes, valueParser, valueIndex);
+ }
+
+ @Override
+ public Class<? extends RecordWithMetadata<T>> getRecordClass() {
+ return recordClass;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index a7ab062..745c653 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -25,9 +25,10 @@ import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IInputStreamProviderFactory;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
-import org.apache.asterix.external.input.record.reader.factory.LineRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.SemiStructuredRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.TwitterRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.couchbase.CouchbaseReaderFactory;
+import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
import org.apache.asterix.external.input.stream.factory.SocketInputStreamProviderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -97,8 +98,11 @@ public class DatasourceFactoryProvider {
case ExternalDataConstants.READER_TWITTER_PUSH:
readerFactory = new TwitterRecordReaderFactory();
break;
+ case ExternalDataConstants.READER_COUCHBASE:
+ readerFactory = new CouchbaseReaderFactory();
+ break;
default:
- throw new AsterixException("unknown record reader factory");
+ throw new AsterixException("unknown record reader factory: " + reader);
}
}
return readerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
new file mode 100644
index 0000000..18b9cb5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.ILookupReaderFactory;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSLookupReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+
+public class LookupReaderFactoryProvider {
+
+ @SuppressWarnings("rawtypes")
+ public static ILookupReaderFactory getLookupReaderFactory(Map<String, String> configuration) throws Exception {
+ String inputFormat = HDFSUtils.getInputFormatClassName(configuration);
+ if (inputFormat.equals(ExternalDataConstants.CLASS_NAME_TEXT_INPUT_FORMAT)
+ || inputFormat.equals(ExternalDataConstants.CLASS_NAME_SEQUENCE_INPUT_FORMAT)
+ || inputFormat.equals(ExternalDataConstants.CLASS_NAME_RC_INPUT_FORMAT)) {
+ HDFSLookupReaderFactory<Object> readerFactory = new HDFSLookupReaderFactory<Object>();
+ readerFactory.configure(configuration);
+ return readerFactory;
+ } else {
+ throw new AsterixException("Unrecognized external format");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index f5a0512..30595db 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -22,10 +22,12 @@ import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
import org.apache.asterix.external.parser.factory.DelimitedDataParserFactory;
import org.apache.asterix.external.parser.factory.HiveDataParserFactory;
import org.apache.asterix.external.parser.factory.RSSParserFactory;
+import org.apache.asterix.external.parser.factory.RecordWithMetadataParserFactory;
import org.apache.asterix.external.parser.factory.TweetParserFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -39,13 +41,12 @@ public class ParserFactoryProvider {
return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration),
parserFactoryName);
} else {
- parserFactory = ParserFactoryProvider.getParserFactory(configuration);
+ parserFactory = ParserFactoryProvider.getParserFactory(ExternalDataUtils.getRecordFormat(configuration));
}
return parserFactory;
}
- private static IDataParserFactory getParserFactory(Map<String, String> configuration) throws AsterixException {
- String recordFormat = ExternalDataUtils.getRecordFormat(configuration);
+ private static IDataParserFactory getParserFactory(String recordFormat) throws AsterixException {
switch (recordFormat) {
case ExternalDataConstants.FORMAT_ADM:
case ExternalDataConstants.FORMAT_JSON:
@@ -58,6 +59,8 @@ public class ParserFactoryProvider {
return new TweetParserFactory();
case ExternalDataConstants.FORMAT_RSS:
return new RSSParserFactory();
+ case ExternalDataConstants.FORMAT_RECORD_WITH_META:
+ return new RecordWithMetadataParserFactory<RecordWithMetadata<?>>();
default:
throw new AsterixException("Unknown data format");
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index 90c74e1..27a1d0e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -36,7 +36,7 @@ public class DataflowUtils {
public static void addTupleToFrame(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
throws HyracksDataException {
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- appender.flush(writer, true);
+ appender.write(writer, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
throw new HyracksDataException("Tuple is too large for a frame");
}