You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 16:59:50 UTC
[15/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master'
into hyracks-merge2
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 00c1eb7,0000000..3c3b8fb
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@@ -1,182 -1,0 +1,178 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
- import java.nio.file.Path;
- import java.util.Map;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FileSystemWatcher;
- import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
- import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.log4j.Logger;
+
+public class LocalFSInputStream extends AsterixInputStream {
+
+ private static final Logger LOGGER = Logger.getLogger(LocalFSInputStream.class.getName());
- private final Path path;
+ private final FileSystemWatcher watcher;
+ private FileInputStream in;
+ private byte lastByte;
+ private File currentFile;
+
- public LocalFSInputStream(final FileSplit[] fileSplits, final IHyracksTaskContext ctx,
- final Map<String, String> configuration, final int partition, final String expression, final boolean isFeed)
- throws IOException {
- this.path = fileSplits[partition].getLocalFile().getFile().toPath();
- this.watcher = new FileSystemWatcher(path, expression, isFeed);
- this.watcher.init();
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager logManager) {
- super.setFeedLogManager(logManager);
- watcher.setFeedLogManager(logManager);
++ public LocalFSInputStream(FileSystemWatcher watcher) {
++ this.watcher = watcher;
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+ super.setController(controller);
- watcher.setController(controller);
+ }
+
+ @Override
++ public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException {
++ super.setFeedLogManager(logManager);
++ watcher.setFeedLogManager(logManager);
++ };
++
++ @Override
+ public void close() throws IOException {
+ IOException ioe = null;
+ if (in != null) {
+ try {
+ closeFile();
+ } catch (Exception e) {
+ ioe = new IOException(e);
+ }
+ }
+ try {
+ watcher.close();
+ } catch (Exception e) {
+ if (ioe == null) {
+ throw e;
+ }
+ ioe.addSuppressed(e);
+ throw ioe;
+ }
+ }
+
+ private void closeFile() throws IOException {
+ if (in != null) {
++ if (logManager != null) {
++ logManager.endPartition(currentFile.getAbsolutePath());
++ }
+ try {
+ in.close();
+ } finally {
+ in = null;
+ currentFile = null;
+ }
+ }
+ }
+
+ /**
+ * Closes the current input stream and opens the next one, if any.
+ */
+ private boolean advance() throws IOException {
+ closeFile();
- if (watcher.hasNext()) {
- currentFile = watcher.next();
++ currentFile = watcher.poll();
++ if (currentFile == null) {
++ if (controller != null) {
++ controller.flush();
++ }
++ currentFile = watcher.take();
++ }
++ if (currentFile != null) {
+ in = new FileInputStream(currentFile);
++ if (notificationHandler != null) {
++ notificationHandler.notifyNewSource();
++ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new HyracksDataException(
+ "read() is not supported with this stream. use read(byte[] b, int off, int len)");
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (in == null) {
+ if (!advance()) {
+ return -1;
+ }
+ }
+ int result = in.read(b, off, len);
+ while ((result < 0) && advance()) {
+ // return a new line at the end of every file <--Might create problems for some cases
+ // depending on the parser implementation-->
+ if ((lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_LF)) {
+ lastByte = ExternalDataConstants.BYTE_LF;
+ b[off] = ExternalDataConstants.BYTE_LF;
+ return 1;
+ }
+ // recursive call
+ result = in.read(b, off, len);
+ }
+ if (result > 0) {
+ lastByte = b[(off + result) - 1];
+ }
+ return result;
+ }
+
+ @Override
+ public boolean stop() throws Exception {
++ closeFile();
+ watcher.close();
+ return true;
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ if (in == null) {
+ return false;
+ }
+ if (th instanceof IOException) {
+ // TODO: Change from string check to exception type
+ if (th.getCause().getMessage().contains("Malformed input stream")) {
+ if (currentFile != null) {
+ try {
+ logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
+ } catch (IOException e) {
+ LOGGER.warn("Filed to write to feed log file", e);
+ }
+ LOGGER.warn("Corrupted input file: " + currentFile.getAbsolutePath());
+ }
+ try {
+ advance();
+ return true;
+ } catch (Exception e) {
- return false;
- }
- } else {
- try {
- watcher.init();
- } catch (IOException e) {
- LOGGER.warn("Failed to initialize watcher during failure recovery", e);
- return false;
++ LOGGER.warn("An exception was thrown while trying to skip a file", e);
+ }
+ }
- return true;
+ }
++ LOGGER.warn("Failed to recover from failure", th);
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index 85d0e41,0000000..ae012f3
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@@ -1,158 -1,0 +1,164 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.File;
- import java.io.IOException;
++import java.nio.file.Path;
++import java.util.ArrayList;
+import java.util.Map;
++import java.util.Set;
++import java.util.TreeSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.external.api.INodeResolverFactory;
+import org.apache.asterix.external.input.stream.LocalFSInputStream;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
- import org.apache.asterix.external.util.FeedUtils;
++import org.apache.asterix.external.util.FileSystemWatcher;
+import org.apache.asterix.external.util.NodeResolverFactory;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class LocalFSInputStreamFactory implements IInputStreamFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ protected static final INodeResolver DEFAULT_NODE_RESOLVER = new NodeResolverFactory().createNodeResolver();
+ protected static final Logger LOGGER = Logger.getLogger(LocalFSInputStreamFactory.class.getName());
+ protected static INodeResolver nodeResolver;
+ protected Map<String, String> configuration;
+ protected FileSplit[] inputFileSplits;
- protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
+ protected boolean isFeed;
+ protected String expression;
+ // transient fields (They don't need to be serialized and transferred)
+ private transient AlgebricksAbsolutePartitionConstraint constraints;
++ private transient FileSystemWatcher watcher;
+
+ @Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
- try {
- return new LocalFSInputStream(inputFileSplits, ctx, configuration, partition, expression, isFeed);
- } catch (IOException e) {
- throw new HyracksDataException(e);
++ public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
++ throws HyracksDataException {
++ if (watcher == null) {
++ String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
++ ArrayList<Path> inputResources = new ArrayList<>();
++ for (int i = 0; i < inputFileSplits.length; i++) {
++ if (inputFileSplits[i].getNodeName().equals(nodeName)) {
++ inputResources.add(inputFileSplits[i].getLocalFile().getFile().toPath());
++ }
++ }
++ watcher = new FileSystemWatcher(inputResources, expression, isFeed);
+ }
++ return new LocalFSInputStream(watcher);
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.STREAM;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws AsterixException {
+ this.configuration = configuration;
+ String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
+ configureFileSplits(splits);
+ configurePartitionConstraint();
+ this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.keepDataSourceOpen(configuration);
- if (isFeed) {
- feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
- ExternalDataUtils.getFeedName(configuration), constraints);
- }
+ this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION);
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return constraints;
+ }
+
+ private void configureFileSplits(String[] splits) throws AsterixException {
++ INodeResolver resolver = getNodeResolver();
+ if (inputFileSplits == null) {
+ inputFileSplits = new FileSplit[splits.length];
+ String nodeName;
+ String nodeLocalPath;
+ int count = 0;
+ String trimmedValue;
+ for (String splitPath : splits) {
+ trimmedValue = splitPath.trim();
+ if (!trimmedValue.contains("://")) {
+ throw new AsterixException(
+ "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
+ }
- nodeName = trimmedValue.split(":")[0];
++ nodeName = resolver.resolveNode(trimmedValue.split(":")[0]);
+ nodeLocalPath = trimmedValue.split("://")[1];
+ FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
+ inputFileSplits[count++] = fileSplit;
+ }
+ }
+ }
+
+ private void configurePartitionConstraint() throws AsterixException {
- String[] locs = new String[inputFileSplits.length];
- String location;
++ Set<String> locs = new TreeSet<>();
+ for (int i = 0; i < inputFileSplits.length; i++) {
- location = getNodeResolver().resolveNode(inputFileSplits[i].getNodeName());
- locs[i] = location;
++ String location = inputFileSplits[i].getNodeName();
++ locs.add(location);
+ }
- constraints = new AlgebricksAbsolutePartitionConstraint(locs);
++ constraints = new AlgebricksAbsolutePartitionConstraint(locs.toArray(new String[locs.size()]));
+ }
+
+ protected INodeResolver getNodeResolver() {
+ if (nodeResolver == null) {
+ synchronized (DEFAULT_NODE_RESOLVER) {
+ if (nodeResolver == null) {
+ nodeResolver = initializeNodeResolver();
+ }
+ }
+ }
+ return nodeResolver;
+ }
+
+ private static INodeResolver initializeNodeResolver() {
+ INodeResolver nodeResolver = null;
+ String configuredNodeResolverFactory = System.getProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY);
+ if (configuredNodeResolverFactory != null) {
+ try {
+ nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
+ .createNodeResolver();
+
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname "
+ + configuredNodeResolverFactory + "\n" + e.getMessage());
+ }
+ nodeResolver = DEFAULT_NODE_RESOLVER;
+ }
+ } else {
+ nodeResolver = DEFAULT_NODE_RESOLVER;
+ }
+ return nodeResolver;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index d362201,0000000..6ba27d8
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@@ -1,129 -1,0 +1,124 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.api.IRecordWithPKDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.api.IStreamDataParserFactory;
+import org.apache.asterix.external.dataflow.ChangeFeedDataFlowController;
+import org.apache.asterix.external.dataflow.ChangeFeedWithMetaDataFlowController;
+import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
+import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
+import org.apache.asterix.external.dataflow.FeedTupleForwarder;
+import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController;
+import org.apache.asterix.external.dataflow.IndexingDataFlowController;
+import org.apache.asterix.external.dataflow.RecordDataFlowController;
+import org.apache.asterix.external.dataflow.StreamDataFlowController;
+import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
- import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class DataflowControllerProvider {
+
+ // TODO: Instead, use a factory just like data source and data parser.
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
+ int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
- Map<String, String> configuration, boolean indexingOp, boolean isFeed, FileSplit[] feedLogFileSplits)
- throws HyracksDataException {
++ Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager)
++ throws HyracksDataException {
+ try {
- FeedLogManager feedLogManager = null;
- if (isFeed) {
- feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits);
- }
+ switch (dataSourceFactory.getDataSourceType()) {
+ case RECORDS:
+ IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory;
+ IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
+ IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
+ IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
+ if (indexingOp) {
+ return new IndexingDataFlowController(ctx,
+ DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
+ recordReader, ((IIndexingDatasource) recordReader).getIndexer());
+ } else if (isFeed) {
+ FeedTupleForwarder tupleForwarder = (FeedTupleForwarder) DataflowUtils
+ .getTupleForwarder(configuration, feedLogManager);
+ boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration);
+ boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration);
+ if (isRecordWithMeta) {
+ if (isChangeFeed) {
+ int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
+ return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
+ numOfKeys + 2, (RecordWithMetadataParser) dataParser, recordReader);
+ } else {
+ return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
+ (RecordWithMetadataParser) dataParser, recordReader);
+ }
+ } else if (isChangeFeed) {
+ int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
+ return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1,
+ (IRecordWithPKDataParser) dataParser, recordReader);
+ } else {
+ return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser,
+ recordReader);
+ }
+ } else {
+ return new RecordDataFlowController(ctx,
+ DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
+ recordReader, 1);
+ }
+ case STREAM:
+ IInputStreamFactory streamFactory = (IInputStreamFactory) dataSourceFactory;
+ AsterixInputStream stream = streamFactory.createInputStream(ctx, partition);
+ IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory;
+ IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
+ streamParser.setInputStream(stream);
+ if (isFeed) {
+ return new FeedStreamDataFlowController(ctx,
+ (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager),
+ feedLogManager, FeedUtils.getNumOfFields(configuration), streamParser, stream);
+ } else {
+ return new StreamDataFlowController(ctx, DataflowUtils.getTupleForwarder(configuration, null),
+ streamParser);
+ }
+ default:
+ throw new HyracksDataException(
+ "Unknown data source type: " + dataSourceFactory.getDataSourceType());
+ }
+ } catch (IOException | AsterixException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index f8d64e0,0000000..0f24f91
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@@ -1,149 -1,0 +1,115 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.record.reader.RecordWithPKTestReaderFactory;
+import org.apache.asterix.external.input.record.reader.kv.KVReaderFactory;
+import org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory;
- import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReaderFactory;
- import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
- import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
++import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
+import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
++import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
+import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
+import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+
+public class DatasourceFactoryProvider {
+
+ public static IExternalDataSourceFactory getExternalDataSourceFactory(Map<String, String> configuration)
+ throws AsterixException {
+ if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) {
+ String reader = configuration.get(ExternalDataConstants.KEY_READER);
+ return DatasourceFactoryProvider.getRecordReaderFactory(reader, configuration);
+ } else {
+ // get stream source
+ String streamSource = configuration.get(ExternalDataConstants.KEY_STREAM_SOURCE);
+ return DatasourceFactoryProvider.getInputStreamFactory(streamSource, configuration);
+ }
+ }
+
- public static IInputStreamFactory getInputStreamFactory(String streamSource,
- Map<String, String> configuration) throws AsterixException {
++ public static IInputStreamFactory getInputStreamFactory(String streamSource, Map<String, String> configuration)
++ throws AsterixException {
+ IInputStreamFactory streamSourceFactory;
+ if (ExternalDataUtils.isExternal(streamSource)) {
+ String dataverse = ExternalDataUtils.getDataverse(configuration);
+ streamSourceFactory = ExternalDataUtils.createExternalInputStreamFactory(dataverse, streamSource);
+ } else {
+ switch (streamSource) {
- case ExternalDataConstants.STREAM_HDFS:
- streamSourceFactory = new HDFSDataSourceFactory();
- break;
+ case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
+ streamSourceFactory = new LocalFSInputStreamFactory();
+ break;
- case ExternalDataConstants.STREAM_SOCKET:
++ case ExternalDataConstants.SOCKET:
+ case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
+ streamSourceFactory = new SocketServerInputStreamFactory();
+ break;
+ case ExternalDataConstants.STREAM_SOCKET_CLIENT:
+ streamSourceFactory = new SocketServerInputStreamFactory();
+ break;
+ case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
+ streamSourceFactory = new TwitterFirehoseStreamFactory();
+ break;
+ default:
+ throw new AsterixException("unknown input stream factory");
+ }
+ }
+ return streamSourceFactory;
+ }
+
+ public static IRecordReaderFactory<?> getRecordReaderFactory(String reader, Map<String, String> configuration)
+ throws AsterixException {
+ if (reader.equals(ExternalDataConstants.EXTERNAL)) {
+ return ExternalDataUtils.createExternalRecordReaderFactory(configuration);
+ }
- String parser = configuration.get(ExternalDataConstants.KEY_PARSER);
- IInputStreamFactory inputStreamFactory;
- switch (parser) {
- case ExternalDataConstants.FORMAT_ADM:
- case ExternalDataConstants.FORMAT_JSON:
- case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
- case ExternalDataConstants.FORMAT_LINE_SEPARATED:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new EmptyLineSeparatedRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
- case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
- case ExternalDataConstants.FORMAT_CSV:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
- case ExternalDataConstants.FORMAT_RECORD_WITH_METADATA:
- switch (reader) {
- case ExternalDataConstants.READER_KV:
- return new KVReaderFactory();
- case ExternalDataConstants.READER_KV_TEST:
- return new KVTestReaderFactory();
- }
- }
- String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
- if (format != null) {
- switch (format) {
- case ExternalDataConstants.FORMAT_ADM:
- case ExternalDataConstants.FORMAT_JSON:
- case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
- case ExternalDataConstants.FORMAT_LINE_SEPARATED:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new EmptyLineSeparatedRecordReaderFactory()
- .setInputStreamFactoryProvider(inputStreamFactory);
- case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
- case ExternalDataConstants.FORMAT_CSV:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
- }
- }
+ switch (reader) {
++ case ExternalDataConstants.READER_KV:
++ return new KVReaderFactory();
++ case ExternalDataConstants.READER_KV_TEST:
++ return new KVTestReaderFactory();
+ case ExternalDataConstants.READER_HDFS:
+ return new HDFSDataSourceFactory();
++ case ExternalDataConstants.ALIAS_LOCALFS_ADAPTER:
++ return new StreamRecordReaderFactory(new LocalFSInputStreamFactory());
+ case ExternalDataConstants.READER_TWITTER_PULL:
+ case ExternalDataConstants.READER_TWITTER_PUSH:
++ case ExternalDataConstants.READER_PUSH_TWITTER:
++ case ExternalDataConstants.READER_PULL_TWITTER:
+ return new TwitterRecordReaderFactory();
- case ExternalDataConstants.READER_KV:
- return new KVReaderFactory();
- case ExternalDataConstants.READER_KV_TEST:
- return new KVTestReaderFactory();
+ case ExternalDataConstants.TEST_RECORD_WITH_PK:
+ return new RecordWithPKTestReaderFactory();
++ case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
++ return new StreamRecordReaderFactory(new TwitterFirehoseStreamFactory());
++ case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
++ case ExternalDataConstants.SOCKET:
++ return new StreamRecordReaderFactory(new SocketServerInputStreamFactory());
++ case ExternalDataConstants.STREAM_SOCKET_CLIENT:
++ return new StreamRecordReaderFactory(new SocketClientInputStreamFactory());
+ default:
+ throw new AsterixException("unknown record reader factory: " + reader);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 06928b3,0000000..682fb89
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@@ -1,76 -1,0 +1,76 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import javax.annotation.Nonnull;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
+import org.apache.asterix.external.parser.factory.DelimitedDataParserFactory;
+import org.apache.asterix.external.parser.factory.HiveDataParserFactory;
+import org.apache.asterix.external.parser.factory.RSSParserFactory;
+import org.apache.asterix.external.parser.factory.RecordWithMetadataParserFactory;
+import org.apache.asterix.external.parser.factory.TestRecordWithPKParserFactory;
+import org.apache.asterix.external.parser.factory.TweetParserFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+
+public class ParserFactoryProvider {
+ public static IDataParserFactory getDataParserFactory(Map<String, String> configuration) throws AsterixException {
+ IDataParserFactory parserFactory = null;
+ String parserFactoryName = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
+ if ((parserFactoryName != null) && ExternalDataUtils.isExternal(parserFactoryName)) {
+ return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration),
+ parserFactoryName);
+ } else {
+ parserFactory = ParserFactoryProvider
+ .getDataParserFactory(ExternalDataUtils.getRecordFormat(configuration));
+ }
+ return parserFactory;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static IDataParserFactory getDataParserFactory(@Nonnull String parser) throws AsterixException {
+ switch (parser) {
+ case ExternalDataConstants.FORMAT_ADM:
+ case ExternalDataConstants.FORMAT_JSON:
+ case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
+ return new ADMDataParserFactory();
+ case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
+ case ExternalDataConstants.FORMAT_CSV:
+ return new DelimitedDataParserFactory();
+ case ExternalDataConstants.FORMAT_HIVE:
+ case ExternalDataConstants.PARSER_HIVE:
+ return new HiveDataParserFactory();
+ case ExternalDataConstants.FORMAT_TWEET:
+ return new TweetParserFactory();
+ case ExternalDataConstants.FORMAT_RSS:
+ return new RSSParserFactory();
+ case ExternalDataConstants.FORMAT_RECORD_WITH_METADATA:
+ return new RecordWithMetadataParserFactory();
+ case ExternalDataConstants.TEST_RECORD_WITH_PK:
+ return new TestRecordWithPKParserFactory();
+ default:
- throw new AsterixException("Unknown parser " + parser);
++ throw new AsterixException("Unknown format: " + parser);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a02152b,0000000..b5ec27a
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@@ -1,232 -1,0 +1,232 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+public class ExternalDataConstants {
+ // TODO: Remove unused variables.
+ /**
+ * Keys
+ */
+ // used to specify the stream factory for an adapter that has a stream data source
+ public static final String KEY_STREAM = "stream";
+ // used to specify the dataverse of the adapter
+ public static final String KEY_DATAVERSE = "dataverse";
+ // used to specify the socket addresses when reading data from sockets
+ public static final String KEY_SOCKETS = "sockets";
+ // specify whether the socket address points to an NC or an IP
+ public static final String KEY_MODE = "address-type";
+ // specify the HDFS name node address when reading HDFS data
+ public static final String KEY_HDFS_URL = "hdfs";
+ // specify the path when reading from a file system
+ public static final String KEY_PATH = "path";
+ // specify the HDFS input format when reading data from HDFS
+ public static final String KEY_INPUT_FORMAT = "input-format";
+ // specifies the filesystem (localfs or HDFS) when using a filesystem data source
+ public static final String KEY_FILESYSTEM = "fs";
+ // specifies the address of the HDFS name node
+ public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
+ // specifies the class implementation of the accessed instance of HDFS
+ public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
+ public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
+ public static final String KEY_HADOOP_INPUT_FORMAT = "mapred.input.format.class";
+ public static final String KEY_HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
+ public static final String KEY_HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
+ public static final String KEY_HADOOP_BUFFER_SIZE = "io.file.buffer.size";
+ public static final String KEY_SOURCE_DATATYPE = "type-name";
+ public static final String KEY_DELIMITER = "delimiter";
+ public static final String KEY_PARSER_FACTORY = "tuple-parser";
+ public static final String KEY_DATA_PARSER = "parser";
+ public static final String KEY_HEADER = "header";
+ public static final String KEY_READER = "reader";
+ public static final String KEY_READER_STREAM = "stream";
+ public static final String KEY_TYPE_NAME = "type-name";
+ public static final String KEY_RECORD_START = "record-start";
+ public static final String KEY_RECORD_END = "record-end";
+ public static final String KEY_EXPRESSION = "expression";
+ public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
+ public static final String KEY_FORMAT = "format";
+ public static final String KEY_QUOTE = "quote";
+ public static final String KEY_PARSER = "parser";
+ public static final String KEY_DATASET_RECORD = "dataset-record";
+ public static final String KEY_HIVE_SERDE = "hive-serde";
+ public static final String KEY_RSS_URL = "url";
+ public static final String KEY_INTERVAL = "interval";
- public static final String KEY_PULL = "pull";
- public static final String KEY_PUSH = "push";
+ public static final String KEY_IS_FEED = "is-feed";
+ public static final String KEY_WAIT_FOR_DATA = "wait-for-data";
+ public static final String KEY_FEED_NAME = "feed";
+ // a string representing external bucket name
+ public static final String KEY_BUCKET = "bucket";
+ // a comma delimited list of nodes
+ public static final String KEY_NODES = "nodes";
+ // a string representing the password used to authenticate with the external data source
+ public static final String KEY_PASSWORD = "password";
+ // an integer representing the number of raw records that can be buffered in the parsing queue
+ public static final String KEY_QUEUE_SIZE = "queue-size";
+ // a comma delimited integers representing the indexes of the meta fields in the raw record (i,e: "3,1,0,2" denotes that the first meta field is in index 3 in the actual record)
+ public static final String KEY_META_INDEXES = "meta-indexes";
+ // an integer representing the index of the value field in the data type
+ public static final String KEY_VALUE_INDEX = "value-index";
+ // a string representing the format of the raw record in the value field in the data type
+ public static final String KEY_VALUE_FORMAT = "value-format";
+ // a boolean indicating whether the feed is a change feed
+ public static final String KEY_IS_CHANGE_FEED = "change-feed";
+ // an integer representing the number of keys in a change feed
+ public static final String KEY_KEY_SIZE = "key-size";
+ // a boolean indicating whether the feed produces records with metadata
+ public static final String FORMAT_RECORD_WITH_METADATA = "record-with-metadata";
+ // a string representing the format of the record (for adapters which produces records with additional information like pk or metadata)
+ public static final String KEY_RECORD_FORMAT = "record-format";
+ public static final String KEY_META_TYPE_NAME = "meta-type-name";
+ public static final String READER_STREAM = "stream";
+ /**
+ * HDFS class names
+ */
+ public static final String CLASS_NAME_TEXT_INPUT_FORMAT = TextInputFormat.class.getName();
+ public static final String CLASS_NAME_SEQUENCE_INPUT_FORMAT = SequenceFileInputFormat.class.getName();
+ public static final String CLASS_NAME_RC_INPUT_FORMAT = RCFileInputFormat.class.getName();
+ public static final String CLASS_NAME_HDFS_FILESYSTEM = DistributedFileSystem.class.getName();
+ /**
+ * input formats aliases
+ */
+ public static final String INPUT_FORMAT_TEXT = "text-input-format";
+ public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+ public static final String INPUT_FORMAT_RC = "rc-input-format";
+ /**
+ * Builtin streams
+ */
+
+ /**
+ * Builtin record readers
+ */
+ public static final String READER_HDFS = "hdfs";
+ public static final String READER_KV = "key-value";
- public static final String READER_TWITTER_PUSH = "twitter-push";
- public static final String READER_TWITTER_PULL = "twitter-pull";
++ public static final String READER_TWITTER_PUSH = "twitter_push";
++ public static final String READER_PUSH_TWITTER = "push_twitter";
++ public static final String READER_TWITTER_PULL = "twitter_pull";
++ public static final String READER_PULL_TWITTER = "pull_twitter";
+
+ public static final String CLUSTER_LOCATIONS = "cluster-locations";
+ public static final String SCHEDULER = "hdfs-scheduler";
+ public static final String PARSER_HIVE = "hive-parser";
+ public static final String HAS_HEADER = "has.header";
+ public static final String TIME_TRACKING = "time.tracking";
+ public static final String DEFAULT_QUOTE = "\"";
+ public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
+ public static final String DEFAULT_DELIMITER = ",";
+ public static final String EXTERNAL_LIBRARY_SEPARATOR = "#";
+ public static final String HDFS_INDEXING_ADAPTER = "hdfs-indexing-adapter";
+ /**
+ * supported builtin record formats
+ */
+ public static final String FORMAT_HIVE = "hive";
+ public static final String FORMAT_BINARY = "binary";
+ public static final String FORMAT_ADM = "adm";
+ public static final String FORMAT_JSON = "json";
+ public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+ public static final String FORMAT_TWEET = "twitter-status";
+ public static final String FORMAT_RSS = "rss";
+ public static final String FORMAT_SEMISTRUCTURED = "semi-structured";
+ public static final String FORMAT_LINE_SEPARATED = "line-separated";
+ public static final String FORMAT_HDFS_WRITABLE = "hdfs-writable";
+ public static final String FORMAT_KV = "kv";
+
+ /**
+ * input streams
+ */
+ public static final String STREAM_HDFS = "hdfs";
+ public static final String STREAM_LOCAL_FILESYSTEM = "localfs";
- public static final String STREAM_SOCKET = "socket";
++ public static final String SOCKET = "socket";
+ public static final String STREAM_SOCKET_CLIENT = "socket-client";
+
+ /**
+ * adapter aliases
+ */
+ public static final String ALIAS_GENERIC_ADAPTER = "adapter";
+ public static final String ALIAS_LOCALFS_ADAPTER = "localfs";
+ public static final String ALIAS_LOCALFS_PUSH_ADAPTER = "push_localfs";
+ public static final String ALIAS_HDFS_ADAPTER = "hdfs";
+ public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter";
+ public static final String ALIAS_TWITTER_FIREHOSE_ADAPTER = "twitter_firehose";
+ public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client";
+ public static final String ALIAS_RSS_ADAPTER = "rss_feed";
+ public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed";
+ public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter";
+ public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter";
+ public static final String ALIAS_CNN_ADAPTER = "cnn_feed";
+ public static final String ALIAS_FEED_WITH_META_ADAPTER = "feed_with_meta";
+ public static final String ALIAS_CHANGE_FEED_WITH_META_ADAPTER = "change_feed_with_meta";
+ // for testing purposes
+ public static final String ALIAS_TEST_CHANGE_ADAPTER = "test_change_feed";
+
+ /**
+ * Constant String values
+ */
+ public static final String TRUE = "true";
+ public static final String FALSE = "false";
+
+ /**
+ * Constant characters
+ */
+ public static final char ESCAPE = '\\';
+ public static final char QUOTE = '"';
+ public static final char SPACE = ' ';
+ public static final char TAB = '\t';
+ public static final char LF = '\n';
+ public static final char CR = '\r';
+ public static final char DEFAULT_RECORD_START = '{';
+ public static final char DEFAULT_RECORD_END = '}';
+
+ /**
+ * Constant byte characters
+ */
+ public static final byte BYTE_LF = '\n';
+ public static final byte BYTE_CR = '\r';
+ /**
+ * Size default values
+ */
+ public static final int DEFAULT_BUFFER_SIZE = 4096;
+ public static final int DEFAULT_BUFFER_INCREMENT = 2048;
+ public static final int DEFAULT_QUEUE_SIZE = 64;
+ public static final int MAX_RECORD_SIZE = 32000000;
+
+ /**
+ * Expected parameter values
+ */
+ public static final String PARAMETER_OF_SIZE_ONE = "Value of size 1";
+ public static final String LARGE_RECORD_ERROR_MESSAGE = "Record is too large";
+ public static final String KEY_RECORD_INDEX = "record-index";
+ public static final String FORMAT_DCP = "dcp";
+ public static final String KEY_KEY_INDEXES = "key-indexes";
+ public static final String KEY_KEY_INDICATORS = "key-indicators";
+ public static final String KEY_STREAM_SOURCE = "stream-source";
+ public static final String EXTERNAL = "external";
+ public static final String KEY_READER_FACTORY = "reader-factory";
+ public static final String READER_KV_TEST = "kv_test";
+ public static final String READER_RSS = "rss";
+ public static final String FORMAT_CSV = "csv";
+ public static final String TEST_RECORD_WITH_PK = "test-record-with-pk";
+
+ public static final String ERROR_LARGE_RECORD = "Record is too large";
+ public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 42fe8bf,0000000..76898c2
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@@ -1,342 -1,0 +1,326 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+
+public class ExternalDataUtils {
+
+ // Get a delimiter from the given configuration
+ public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
+ String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
+ if (delimiterValue == null) {
+ delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
+ } else if (delimiterValue.length() != 1) {
+ throw new AsterixException(
+ "'" + delimiterValue + "' is not a valid delimiter. The length of a delimiter should be 1.");
+ }
+ return delimiterValue.charAt(0);
+ }
+
+ // Get a quote from the given configuration when the delimiter is given
+ // Need to pass delimiter to check whether they share the same character
+ public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
+ String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
+ if (quoteValue == null) {
+ quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
+ } else if (quoteValue.length() != 1) {
+ throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
+ }
+
+ // Since delimiter (char type value) can't be null,
+ // we only check whether delimiter and quote use the same character
+ if (quoteValue.charAt(0) == delimiter) {
+ throw new AsterixException(
+ "Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter + "'. ");
+ }
+
+ return quoteValue.charAt(0);
+ }
+
+ // Get the header flag
+ public static boolean getHasHeader(Map<String, String> configuration) {
+ return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_HEADER));
+ }
+
+ public static void validateParameters(Map<String, String> configuration) throws AsterixException {
+ String reader = configuration.get(ExternalDataConstants.KEY_READER);
+ if (reader == null) {
+ throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER + " must be specified.");
+ }
- String parser = configuration.get(ExternalDataConstants.KEY_PARSER);
++ String parser = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ if (parser == null) {
- throw new AsterixException("The parameter " + ExternalDataConstants.KEY_PARSER + " must be specified.");
++ throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " must be specified.");
+ }
+ }
+
+ public static DataSourceType getDataSourceType(Map<String, String> configuration) {
+ String reader = configuration.get(ExternalDataConstants.KEY_READER);
+ if ((reader != null) && reader.equals(ExternalDataConstants.READER_STREAM)) {
+ return DataSourceType.STREAM;
+ } else {
+ return DataSourceType.RECORDS;
+ }
+ }
+
+ public static boolean isExternal(String aString) {
+ return ((aString != null) && aString.contains(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR)
+ && (aString.trim().length() > 1));
+ }
+
+ public static ClassLoader getClassLoader(String dataverse, String library) {
+ return ExternalLibraryManager.getLibraryClassLoader(dataverse, library);
+ }
+
+ public static String getLibraryName(String aString) {
+ return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
+ }
+
+ public static String getExternalClassName(String aString) {
+ return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[1];
+ }
+
+ public static IInputStreamFactory createExternalInputStreamFactory(String dataverse, String stream)
+ throws AsterixException {
+ try {
+ String libraryName = getLibraryName(stream);
+ String className = getExternalClassName(stream);
+ ClassLoader classLoader = getClassLoader(dataverse, libraryName);
+ return ((IInputStreamFactory) (classLoader.loadClass(className).newInstance()));
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new AsterixException("Failed to create stream factory", e);
+ }
+ }
+
+ public static String getDataverse(Map<String, String> configuration) {
+ return configuration.get(ExternalDataConstants.KEY_DATAVERSE);
+ }
+
+ public static String getRecordFormat(Map<String, String> configuration) {
+ String parserFormat = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
+ return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT);
+ }
+
+ public static void setRecordFormat(Map<String, String> configuration, String format) {
+ if (!configuration.containsKey(ExternalDataConstants.KEY_DATA_PARSER)) {
+ configuration.put(ExternalDataConstants.KEY_DATA_PARSER, format);
+ }
+ if (!configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
+ configuration.put(ExternalDataConstants.KEY_FORMAT, format);
+ }
+ }
+
+ private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
+
+ private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
+ Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
+ m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+ m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+ m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+ m.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+ m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+ return m;
+ }
+
+ public static IValueParserFactory[] getValueParserFactories(ARecordType recordType) {
+ int n = recordType.getFieldTypes().length;
+ IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+ for (int i = 0; i < n; i++) {
+ ATypeTag tag = null;
+ if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
+ List<IAType> unionTypes = ((AUnionType) recordType.getFieldTypes()[i]).getUnionList();
+ if ((unionTypes.size() != 2) && (unionTypes.get(0).getTypeTag() != ATypeTag.NULL)) {
+ throw new NotImplementedException("Non-optional UNION type is not supported.");
+ }
+ tag = unionTypes.get(1).getTypeTag();
+ } else {
+ tag = recordType.getFieldTypes()[i].getTypeTag();
+ }
+ if (tag == null) {
+ throw new NotImplementedException("Failed to get the type information for field " + i + ".");
+ }
+ fieldParserFactories[i] = getParserFactory(tag);
+ }
+ return fieldParserFactories;
+ }
+
+ public static IValueParserFactory getParserFactory(ATypeTag tag) {
+ IValueParserFactory vpf = valueParserFactoryMap.get(tag);
+ if (vpf == null) {
+ throw new NotImplementedException("No value parser factory for fields of type " + tag);
+ }
+ return vpf;
+ }
+
+ public static String getRecordReaderStreamName(Map<String, String> configuration) {
+ return configuration.get(ExternalDataConstants.KEY_READER_STREAM);
+ }
+
+ public static boolean hasHeader(Map<String, String> configuration) {
+ String value = configuration.get(ExternalDataConstants.KEY_HEADER);
+ if (value != null) {
+ return Boolean.valueOf(value);
+ }
+ return false;
+ }
+
- public static boolean isPull(Map<String, String> configuration) {
- String pull = configuration.get(ExternalDataConstants.KEY_PULL);
- if (pull == null) {
- return false;
- }
- return Boolean.parseBoolean(pull);
- }
-
- public static boolean isPush(Map<String, String> configuration) {
- String push = configuration.get(ExternalDataConstants.KEY_PUSH);
- if (push == null) {
- return false;
- }
- return Boolean.parseBoolean(push);
- }
-
+ public static IRecordReaderFactory<?> createExternalRecordReaderFactory(Map<String, String> configuration)
+ throws AsterixException {
+ String readerFactory = configuration.get(ExternalDataConstants.KEY_READER_FACTORY);
+ if (readerFactory == null) {
+ throw new AsterixException("to use " + ExternalDataConstants.EXTERNAL + " reader, the parameter "
+ + ExternalDataConstants.KEY_READER_FACTORY + " must be specified.");
+ }
+ String[] libraryAndFactory = readerFactory.split(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR);
+ if (libraryAndFactory.length != 2) {
+ throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
+ + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
+ }
+ String[] dataverseAndLibrary = libraryAndFactory[0].split(".");
+ if (dataverseAndLibrary.length != 2) {
+ throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
+ + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
+ }
+
+ ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverseAndLibrary[0],
+ dataverseAndLibrary[1]);
+ try {
+ return (IRecordReaderFactory<?>) classLoader.loadClass(libraryAndFactory[1]).newInstance();
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new AsterixException("Failed to create record reader factory", e);
+ }
+ }
+
+ public static IDataParserFactory createExternalParserFactory(String dataverse, String parserFactoryName)
+ throws AsterixException {
+ try {
+ String library = parserFactoryName.substring(0,
+ parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR));
+ ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverse, library);
+ return (IDataParserFactory) classLoader
+ .loadClass(parserFactoryName
+ .substring(parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1))
+ .newInstance();
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new AsterixException("Failed to create an external parser factory", e);
+ }
+ }
+
+ public static boolean isFeed(Map<String, String> configuration) {
+ if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
+ return false;
+ } else {
+ return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_FEED));
+ }
+ }
+
+ public static void prepareFeed(Map<String, String> configuration, String dataverseName, String feedName) {
+ if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
+ configuration.put(ExternalDataConstants.KEY_IS_FEED, ExternalDataConstants.TRUE);
+ }
+ configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataverseName);
+ configuration.put(ExternalDataConstants.KEY_FEED_NAME, feedName);
+ }
+
+ public static boolean keepDataSourceOpen(Map<String, String> configuration) {
+ if (!configuration.containsKey(ExternalDataConstants.KEY_WAIT_FOR_DATA)) {
+ return true;
+ }
+ return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_WAIT_FOR_DATA));
+ }
+
+ public static String getFeedName(Map<String, String> configuration) {
+ return configuration.get(ExternalDataConstants.KEY_FEED_NAME);
+ }
+
+ public static int getQueueSize(Map<String, String> configuration) {
+ return configuration.containsKey(ExternalDataConstants.KEY_QUEUE_SIZE)
+ ? Integer.parseInt(configuration.get(ExternalDataConstants.KEY_QUEUE_SIZE))
+ : ExternalDataConstants.DEFAULT_QUEUE_SIZE;
+ }
+
+ public static boolean isRecordWithMeta(Map<String, String> configuration) {
+ return configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME);
+ }
+
+ public static void setRecordWithMeta(Map<String, String> configuration, String booleanString) {
+ configuration.put(ExternalDataConstants.FORMAT_RECORD_WITH_METADATA, booleanString);
+ }
+
+ public static boolean isChangeFeed(Map<String, String> configuration) {
+ return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_CHANGE_FEED));
+ }
+
+ public static int getNumberOfKeys(Map<String, String> configuration) throws AsterixException {
+ String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
+ if (keyIndexes == null) {
+ throw new AsterixException(
+ "A change feed must have the parameter " + ExternalDataConstants.KEY_KEY_INDEXES);
+ }
+ return keyIndexes.split(",").length;
+ }
+
+ public static void setNumberOfKeys(Map<String, String> configuration, int value) {
+ configuration.put(ExternalDataConstants.KEY_KEY_SIZE, String.valueOf(value));
+ }
+
+ public static void setChangeFeed(Map<String, String> configuration, String booleanString) {
+ configuration.put(ExternalDataConstants.KEY_IS_CHANGE_FEED, booleanString);
+ }
+
+ public static int[] getPKIndexes(Map<String, String> configuration) {
+ String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
+ String[] stringIndexes = keyIndexes.split(",");
+ int[] intIndexes = new int[stringIndexes.length];
+ for (int i = 0; i < stringIndexes.length; i++) {
+ intIndexes[i] = Integer.parseInt(stringIndexes[i]);
+ }
+ return intIndexes;
+ }
+
+ public static int[] getPKSourceIndicators(Map<String, String> configuration) {
+ String keyIndicators = configuration.get(ExternalDataConstants.KEY_KEY_INDICATORS);
+ String[] stringIndicators = keyIndicators.split(",");
+ int[] intIndicators = new int[stringIndicators.length];
+ for (int i = 0; i < stringIndicators.length; i++) {
+ intIndicators[i] = Integer.parseInt(stringIndicators[i]);
+ }
+ return intIndicators;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
index fc15d3c,0000000..5bb8ec3
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@@ -1,172 -1,0 +1,180 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.TreeSet;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedLogManager {
+
+ public enum LogEntryType {
+ START, // partition start
+ END, // partition end
+ COMMIT, // a record commit within a partition
- SNAPSHOT // an identifier that partitions with identifiers before this one should be
- // ignored
++ SNAPSHOT // an identifier that partitions with identifiers before this one should be ignored
+ }
+
+ public static final String PROGRESS_LOG_FILE_NAME = "progress.log";
+ public static final String ERROR_LOG_FILE_NAME = "error.log";
+ public static final String BAD_RECORDS_FILE_NAME = "failed_record.log";
+ public static final String START_PREFIX = "s:";
+ public static final String END_PREFIX = "e:";
+ public static final int PREFIX_SIZE = 2;
+ private String currentPartition;
+ private final TreeSet<String> completed;
+ private final Path dir;
+ private BufferedWriter progressLogger;
+ private BufferedWriter errorLogger;
+ private BufferedWriter recordLogger;
+ private final StringBuilder stringBuilder = new StringBuilder();
++ private int count = 0;
+
+ public FeedLogManager(File file) throws HyracksDataException {
+ try {
+ this.dir = file.toPath();
+ this.completed = new TreeSet<String>();
+ if (!exists()) {
+ create();
+ }
+ open();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
- public void endPartition() throws IOException {
++ public synchronized void touch() {
++ count++;
++ }
++
++ public synchronized void endPartition() throws IOException {
+ logProgress(END_PREFIX + currentPartition);
+ completed.add(currentPartition);
+ }
+
- public void endPartition(String partition) throws IOException {
++ public synchronized void endPartition(String partition) throws IOException {
+ currentPartition = partition;
+ logProgress(END_PREFIX + currentPartition);
+ completed.add(currentPartition);
+ }
+
- public void startPartition(String partition) throws IOException {
++ public synchronized void startPartition(String partition) throws IOException {
+ currentPartition = partition;
+ logProgress(START_PREFIX + currentPartition);
+ }
+
+ public boolean exists() {
+ return Files.exists(dir);
+ }
+
- public void open() throws IOException {
++ public synchronized void open() throws IOException {
+ // read content of logs.
+ BufferedReader reader = Files.newBufferedReader(
+ Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME));
+ String log = reader.readLine();
+ while (log != null) {
+ if (log.startsWith(END_PREFIX)) {
+ completed.add(getSplitId(log));
+ }
+ log = reader.readLine();
+ }
+ reader.close();
+
+ progressLogger = Files.newBufferedWriter(
+ Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME),
+ StandardCharsets.UTF_8, StandardOpenOption.APPEND);
+ errorLogger = Files.newBufferedWriter(
+ Paths.get(dir.toAbsolutePath().toString() + File.separator + ERROR_LOG_FILE_NAME),
+ StandardCharsets.UTF_8, StandardOpenOption.APPEND);
+ recordLogger = Files.newBufferedWriter(
+ Paths.get(dir.toAbsolutePath().toString() + File.separator + BAD_RECORDS_FILE_NAME),
+ StandardCharsets.UTF_8, StandardOpenOption.APPEND);
+ }
+
- public void close() throws IOException {
++ public synchronized void close() throws IOException {
++ count--;
++ if (count > 0) {
++ return;
++ }
+ progressLogger.close();
+ errorLogger.close();
+ recordLogger.close();
+ }
+
- public boolean create() throws IOException {
++ public synchronized boolean create() throws IOException {
+ File f = dir.toFile();
+ f.mkdirs();
+ new File(f, PROGRESS_LOG_FILE_NAME).createNewFile();
+ new File(f, ERROR_LOG_FILE_NAME).createNewFile();
+ new File(f, BAD_RECORDS_FILE_NAME).createNewFile();
+ return true;
+ }
+
- public boolean destroy() throws IOException {
++ public synchronized boolean destroy() throws IOException {
+ File f = dir.toFile();
+ FileUtils.deleteDirectory(f);
+ return true;
+ }
+
- public void logProgress(String log) throws IOException {
++ public synchronized void logProgress(String log) throws IOException {
+ stringBuilder.setLength(0);
+ stringBuilder.append(log);
+ stringBuilder.append(ExternalDataConstants.LF);
+ progressLogger.write(stringBuilder.toString());
+ progressLogger.flush();
+ }
+
- public void logError(String error, Throwable th) throws IOException {
++ public synchronized void logError(String error, Throwable th) throws IOException {
+ stringBuilder.setLength(0);
+ stringBuilder.append(error);
+ stringBuilder.append(ExternalDataConstants.LF);
+ stringBuilder.append(th.toString());
+ stringBuilder.append(ExternalDataConstants.LF);
+ errorLogger.write(stringBuilder.toString());
+ errorLogger.flush();
+ }
+
- public void logRecord(String record, String errorMessage) throws IOException {
++ public synchronized void logRecord(String record, String errorMessage) throws IOException {
+ stringBuilder.setLength(0);
+ stringBuilder.append(record);
+ stringBuilder.append(ExternalDataConstants.LF);
+ stringBuilder.append(errorMessage);
+ stringBuilder.append(ExternalDataConstants.LF);
+ recordLogger.write(stringBuilder.toString());
+ recordLogger.flush();
+ }
+
+ public static String getSplitId(String log) {
+ return log.substring(PREFIX_SIZE);
+ }
+
- public boolean isSplitRead(String split) {
++ public synchronized boolean isSplitRead(String split) {
+ return completed.contains(split);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 5ab41af,0000000..502a432
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@@ -1,123 -1,0 +1,111 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class FeedUtils {
+ private static String prepareDataverseFeedName(String dataverseName, String feedName) {
+ return dataverseName + File.separator + feedName;
+ }
+
- public static FileSplit splitsForAdapter(String dataverseName, String feedName, int partition,
- ClusterPartition[] nodePartitions) {
++ public static FileSplit splitsForAdapter(String dataverseName, String feedName, String nodeName,
++ ClusterPartition partition) {
+ File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
+ String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
- ClusterPartition nodePartition = nodePartitions[0];
+ String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
- nodePartition.getPartitionId());
- // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
- File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator
- + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + partition);
- return StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f);
++ partition.getPartitionId());
++ // Note: feed adapter instances in a single node share the feed logger
++ // format: 'storage dir name'/partition_#/dataverse/feed/node
++ File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName);
++ return StoragePathUtil.getFileSplitForClusterPartition(partition, f);
+ }
+
+ public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
+ AlgebricksPartitionConstraint partitionConstraints) throws AsterixException {
+ if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
+ throw new AsterixException("Can't create file splits for adapter with count partitioning constraints");
+ }
- File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
- String[] locations = null;
- locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
++ String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
+ List<FileSplit> splits = new ArrayList<FileSplit>();
- String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
- int i = 0;
+ for (String nd : locations) {
- // Always get the first partition
- ClusterPartition nodePartition = AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0];
- String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
- nodePartition.getPartitionId());
- // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
- File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator
- + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + i);
- splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f));
- i++;
++ splits.add(splitsForAdapter(dataverseName, feedName, nd,
++ AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0]));
+ }
+ return splits.toArray(new FileSplit[] {});
+ }
+
+ public static FileReference getAbsoluteFileRef(String relativePath, int ioDeviceId, IIOManager ioManager) {
+ return ioManager.getAbsoluteFileRef(ioDeviceId, relativePath);
+ }
+
+ public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition,
+ FileSplit[] feedLogFileSplits) throws HyracksDataException {
+ return new FeedLogManager(
+ FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(),
+ feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager()).getFile());
+ }
+
+ public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit)
+ throws HyracksDataException {
+ return new FeedLogManager(FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getLocalFile().getFile().getPath(),
+ feedLogFileSplit.getIODeviceId(), ctx.getIOManager()).getFile());
+ }
+
+ public static void processFeedMessage(ByteBuffer input, ByteBuffer message, FrameTupleAccessor fta) {
+ // read the message and reduce the number of tuples
+ fta.reset(input);
+ int tc = fta.getTupleCount() - 1;
+ int offset = fta.getTupleStartOffset(tc);
+ int len = fta.getTupleLength(tc);
+ message.clear();
+ message.put(input.array(), offset, len);
+ message.flip();
+ IntSerDeUtils.putInt(input.array(), FrameHelper.getTupleCountOffset(input.capacity()), tc);
+ }
+
+ public static int getNumOfFields(Map<String, String> configuration) {
+ return 1;
+ }
+
+ public static String getFeedMetaTypeName(Map<String, String> configuration) {
+ return configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME);
+
+ }
+}