You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:05 UTC
[07/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
new file mode 100644
index 0000000..db38c12
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.runtime;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+public class SocketClientAdapter implements IDataSourceAdapter {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(SocketClientAdapter.class.getName());
+
+ private static final String LOCALHOST = "127.0.0.1";
+
+ private static final long RECONNECT_PERIOD = 2000;
+
+ private final String localFile;
+
+ private final int port;
+
+ private boolean continueStreaming = true;
+
+ public SocketClientAdapter(Integer port, String localFile) {
+ this.localFile = localFile;
+ this.port = port;
+ }
+
+ @Override
+ public void start(int partition, IFrameWriter writer) throws Exception {
+ Socket socket = waitForReceiver();
+ OutputStream os = socket.getOutputStream();
+ FileInputStream fin = new FileInputStream(new File(localFile));
+ byte[] chunk = new byte[1024];
+ int read;
+ try {
+ while (continueStreaming) {
+ read = fin.read(chunk);
+ if (read > 0) {
+ os.write(chunk, 0, read);
+ } else {
+ break;
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Finished streaming file " + localFile + "to port [" + port + "]");
+ }
+
+ } finally {
+ socket.close();
+ fin.close();
+ }
+
+ }
+
+ private Socket waitForReceiver() throws Exception {
+ Socket socket = null;
+ while (socket == null) {
+ try {
+ socket = new Socket(LOCALHOST, port);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Receiver not ready, would wait for " + (RECONNECT_PERIOD / 1000)
+ + " seconds before reconnecting");
+ }
+ Thread.sleep(RECONNECT_PERIOD);
+ }
+ }
+ return socket;
+ }
+
+ @Override
+ public boolean stop() throws Exception {
+ continueStreaming = false;
+ return true;
+ }
+
+ @Override
+ public boolean handleException(Throwable e) {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
new file mode 100644
index 0000000..a1e90a8
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.runtime;
+
+import java.util.Map;
+
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class SocketClientAdapterFactory implements IAdapterFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private ARecordType outputType;
+
+ private GenericSocketFeedAdapterFactory genericSocketAdapterFactory;
+
+ private String[] fileSplits;
+
+ public static final String KEY_FILE_SPLITS = "file_splits";
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+ this.outputType = outputType;
+ String fileSplitsValue = configuration.get(KEY_FILE_SPLITS);
+ if (fileSplitsValue == null) {
+ throw new IllegalArgumentException(
+ "File splits not specified. File split is specified as a comma separated list of paths");
+ }
+ fileSplits = fileSplitsValue.trim().split(",");
+ genericSocketAdapterFactory = new GenericSocketFeedAdapterFactory();
+ genericSocketAdapterFactory.configure(configuration, outputType);
+ }
+
+ @Override
+ public String getAlias() {
+ return ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return genericSocketAdapterFactory.getPartitionConstraint();
+ }
+
+ @Override
+ public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+ Pair<String, Integer> socket = genericSocketAdapterFactory.getSockets().get(partition);
+ return new SocketClientAdapter(socket.second, fileSplits[partition]);
+ }
+
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return outputType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
new file mode 100644
index 0000000..b5fd454
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.runtime;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.runtime.DataGenerator.InitializationInfo;
+import org.apache.asterix.external.runtime.DataGenerator.TweetMessage;
+import org.apache.asterix.external.runtime.DataGenerator.TweetMessageIterator;
+
+public class TweetGenerator {
+
+ private static Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
+
+ public static final String KEY_DURATION = "duration";
+ public static final String KEY_TPS = "tps";
+ public static final String KEY_VERBOSE = "verbose";
+ public static final String KEY_FIELDS = "fields";
+ public static final int INFINITY = 0;
+
+ private static final int DEFAULT_DURATION = INFINITY;
+
+ private int duration;
+ private TweetMessageIterator tweetIterator = null;
+ private int partition;
+ private long tweetCount = 0;
+ private int frameTweetCount = 0;
+ private int numFlushedTweets = 0;
+ private DataGenerator dataGenerator = null;
+ private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
+ private String[] fields;
+ private final List<OutputStream> subscribers;
+ private final Object lock = new Object();
+ private final List<OutputStream> subscribersForRemoval = new ArrayList<OutputStream>();
+
+ public TweetGenerator(Map<String, String> configuration, int partition) throws Exception {
+ this.partition = partition;
+ String value = configuration.get(KEY_DURATION);
+ this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
+ dataGenerator = new DataGenerator(new InitializationInfo());
+ tweetIterator = dataGenerator.new TweetMessageIterator(duration);
+ this.fields = configuration.get(KEY_FIELDS) != null ? configuration.get(KEY_FIELDS).split(",") : null;
+ this.subscribers = new ArrayList<OutputStream>();
+ }
+
+ private void writeTweetString(TweetMessage tweetMessage) throws IOException {
+ String tweet = tweetMessage.getAdmEquivalent(fields) + "\n";
+ System.out.println(tweet);
+ tweetCount++;
+ byte[] b = tweet.getBytes();
+ if (outputBuffer.position() + b.length > outputBuffer.limit()) {
+ flush();
+ numFlushedTweets += frameTweetCount;
+ frameTweetCount = 0;
+ outputBuffer.put(b);
+ } else {
+ outputBuffer.put(b);
+ }
+ frameTweetCount++;
+ }
+
+ private void flush() throws IOException {
+ outputBuffer.flip();
+ synchronized (lock) {
+ for (OutputStream os : subscribers) {
+ try {
+ os.write(outputBuffer.array(), 0, outputBuffer.limit());
+ } catch (Exception e) {
+ subscribersForRemoval.add(os);
+ }
+ }
+ if (!subscribersForRemoval.isEmpty()) {
+ subscribers.removeAll(subscribersForRemoval);
+ subscribersForRemoval.clear();
+ }
+ }
+ outputBuffer.position(0);
+ outputBuffer.limit(32 * 1024);
+ }
+
+ public boolean generateNextBatch(int numTweets) throws Exception {
+ boolean moreData = tweetIterator.hasNext();
+ if (!moreData) {
+ if (outputBuffer.position() > 0) {
+ flush();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount);
+ }
+ return false;
+ } else {
+ int count = 0;
+ while (count < numTweets) {
+ writeTweetString(tweetIterator.next());
+ count++;
+ }
+ return true;
+ }
+ }
+
+ public int getNumFlushedTweets() {
+ return numFlushedTweets;
+ }
+
+ public void registerSubscriber(OutputStream os) {
+ synchronized (lock) {
+ subscribers.add(os);
+ }
+ }
+
+ public void deregisterSubscribers(OutputStream os) {
+ synchronized (lock) {
+ subscribers.remove(os);
+ }
+ }
+
+ public void close() throws IOException {
+ synchronized (lock) {
+ for (OutputStream os : subscribers) {
+ os.close();
+ }
+ }
+ }
+
+ public boolean isSubscribed() {
+ return !subscribers.isEmpty();
+ }
+
+ public long getTweetCount() {
+ return tweetCount;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolver.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolver.java
deleted file mode 100644
index a897294..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolver.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.om.util.AsterixRuntimeUtil;
-
-/**
- * Resolves a value (DNS/IP Address) to the id of a Node Controller running at the location.
- */
-public class DNSResolver implements INodeResolver {
-
- private static Random random = new Random();
-
- @Override
- public String resolveNode(String value) throws AsterixException {
- try {
- InetAddress ipAddress = InetAddress.getByName(value);
- Set<String> nodeControllers = AsterixRuntimeUtil.getNodeControllersOnIP(ipAddress);
- if (nodeControllers == null || nodeControllers.isEmpty()) {
- throw new AsterixException(" No node controllers found at the address: " + value);
- }
- String chosenNCId = nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())];
- return chosenNCId;
- }catch (UnknownHostException e){
- throw new AsterixException("Unable to resolve hostname '"+ value + "' to an IP address");
- } catch (AsterixException ae) {
- throw ae;
- } catch (Exception e) {
- throw new AsterixException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
index 6862d7a..f8585bb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
@@ -18,12 +18,15 @@
*/
package org.apache.asterix.external.util;
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.external.api.INodeResolverFactory;
+
/**
- * Factory for creating instance of {@link DNSResolver}
+ * Factory for creating instance of {@link NodeResolver}
*/
public class DNSResolverFactory implements INodeResolverFactory {
- private static final INodeResolver INSTANCE = new DNSResolver();
+ private static final INodeResolver INSTANCE = new NodeResolver();
@Override
public INodeResolver createNodeResolver() {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
new file mode 100644
index 0000000..ea13f25
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.asterix.common.parse.ITupleForwarder.TupleForwardPolicy;
+import org.apache.asterix.external.dataflow.CounterTimerTupleForwarder;
+import org.apache.asterix.external.dataflow.FrameFullTupleForwarder;
+import org.apache.asterix.external.dataflow.RateControlledTupleForwarder;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class DataflowUtils {
+ public static void addTupleToFrame(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
+ throws HyracksDataException {
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ appender.flush(writer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ public static ITupleForwarder getTupleForwarder(Map<String, String> configuration) throws AsterixException {
+ ITupleForwarder policy = null;
+ ITupleForwarder.TupleForwardPolicy policyType = null;
+ String propValue = configuration.get(ITupleForwarder.FORWARD_POLICY);
+ if (propValue == null) {
+ policyType = TupleForwardPolicy.FRAME_FULL;
+ } else {
+ policyType = TupleForwardPolicy.valueOf(propValue.trim().toUpperCase());
+ }
+ switch (policyType) {
+ case FRAME_FULL:
+ policy = new FrameFullTupleForwarder();
+ break;
+ case COUNTER_TIMER_EXPIRED:
+ policy = new CounterTimerTupleForwarder();
+ break;
+ case RATE_CONTROLLED:
+ policy = new RateControlledTupleForwarder();
+ break;
+ default:
+ throw new AsterixException("Unknown tuple forward policy");
+ }
+ return policy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
new file mode 100644
index 0000000..7f91a2b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.om.types.ARecordType;
+
+public class ExternalDataCompatibilityUtils {
+
+ public static void validateCompatibility(IExternalDataSourceFactory dataSourceFactory,
+ IDataParserFactory dataParserFactory) throws AsterixException {
+ if (dataSourceFactory.getDataSourceType() != dataParserFactory.getDataSourceType()) {
+ throw new AsterixException(
+ "datasource-parser mismatch. datasource produces " + dataSourceFactory.getDataSourceType()
+ + " and parser expects " + dataParserFactory.getDataSourceType());
+ }
+ if (dataSourceFactory.getDataSourceType() == DataSourceType.RECORDS) {
+ IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory;
+ IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
+ if (!recordParserFactory.getRecordClass().isAssignableFrom(recordReaderFactory.getRecordClass())) {
+ throw new AsterixException("datasource-parser mismatch. datasource produces records of type "
+ + recordReaderFactory.getRecordClass() + " and parser expects records of type "
+ + recordParserFactory.getRecordClass());
+ }
+ }
+ }
+
+ //TODO:Add remaining aliases
+ public static void addCompatabilityParameters(String adapterClassname, ARecordType itemType,
+ Map<String, String> configuration) throws AsterixException {
+ if (adapterClassname.equals(ExternalDataConstants.ALIAS_HDFS_ADAPTER)
+ || adapterClassname.equalsIgnoreCase(ExternalDataConstants.ADAPTER_HDFS_CLASSNAME)) {
+ if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new AsterixException("Unspecified format parameter for HDFS adapter");
+ }
+ if (configuration.get(ExternalDataConstants.KEY_FORMAT).equals(ExternalDataConstants.FORMAT_BINARY)
+ || configuration.get(ExternalDataConstants.KEY_FORMAT).equals(ExternalDataConstants.FORMAT_HIVE)) {
+ configuration.put(ExternalDataConstants.KEY_READER, ExternalDataConstants.READER_HDFS);
+ } else {
+ configuration.put(ExternalDataConstants.KEY_READER,
+ configuration.get(ExternalDataConstants.KEY_FORMAT));
+ configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.ALIAS_HDFS_ADAPTER);
+ }
+ }
+ if (adapterClassname.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER)
+ || adapterClassname.contains(ExternalDataConstants.ADAPTER_LOCALFS_CLASSNAME)) {
+ if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new AsterixException("Unspecified format parameter for local file system adapter");
+ }
+ configuration.put(ExternalDataConstants.KEY_READER, configuration.get(ExternalDataConstants.KEY_FORMAT));
+ configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.ALIAS_LOCALFS_ADAPTER);
+ }
+ if (configuration.get(ExternalDataConstants.KEY_PARSER) != null
+ && configuration.get(ExternalDataConstants.KEY_PARSER).equals(ExternalDataConstants.PARSER_HIVE)) {
+ configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_HIVE);
+ }
+ if (configuration.get(ExternalDataConstants.KEY_FILESYSTEM) != null) {
+ configuration.put(ExternalDataConstants.KEY_STREAM,
+ configuration.get(ExternalDataConstants.KEY_FILESYSTEM));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
new file mode 100644
index 0000000..2050e6a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+public class ExternalDataConstants {
+ //TODO: Remove unused variables.
+ /**
+ * Keys
+ */
+ // used to specify the stream factory for an adapter that has a stream data source
+ public static final String KEY_STREAM = "stream";
+ // used to specify the dataverse of the adapter
+ public static final String KEY_DATAVERSE = "dataverse";
+ // used to specify the socket addresses when reading data from sockets
+ public static final String KEY_SOCKETS = "sockets";
+ // specify whether the socket address points to an NC or an IP
+ public static final String KEY_MODE = "address-type";
+ // specify the hdfs name node address when reading hdfs data
+ public static final String KEY_HDFS_URL = "hdfs";
+ // specify the path when reading from a file system
+ public static final String KEY_PATH = "path";
+ public static final String KEY_INPUT_FORMAT = "input-format";
+ public static final String KEY_FILESYSTEM = "fs";
+ public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
+ public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
+ public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
+ public static final String KEY_HADOOP_INPUT_FORMAT = "mapred.input.format.class";
+ public static final String KEY_HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
+ public static final String KEY_HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
+ public static final String KEY_HADOOP_BUFFER_SIZE = "io.file.buffer.size";
+ public static final String KEY_SOURCE_DATATYPE = "type-name";
+ public static final String KEY_DELIMITER = "delimiter";
+ public static final String KEY_PARSER_FACTORY = "tuple-parser";
+ public static final String KEY_DATA_PARSER = "parser";
+ public static final String KEY_HEADER = "header";
+ public static final String KEY_READER = "reader";
+ public static final String KEY_READER_STREAM = "reader-stream";
+ public static final String KEY_TYPE_NAME = "type-name";
+ public static final String KEY_RECORD_START = "record-start";
+ public static final String KEY_RECORD_END = "record-end";
+ public static final String KEY_EXPRESSION = "expression";
+ public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
+ public static final String KEY_FORMAT = "format";
+ public static final String KEY_QUOTE = "quote";
+ public static final String KEY_PARSER = "parser";
+ public static final String KEY_DATASET_RECORD = "dataset-record";
+ public static final String KEY_HIVE_SERDE = "hive-serde";
+ public static final String KEY_RSS_URL = "url";
+ public static final String KEY_INTERVAL = "interval";
+ public static final String KEY_PULL = "pull";
+ public static final String KEY_PUSH = "push";
+ /**
+ * HDFS class names
+ */
+ public static final String CLASS_NAME_TEXT_INPUT_FORMAT = TextInputFormat.class.getName();
+ public static final String CLASS_NAME_SEQUENCE_INPUT_FORMAT = SequenceFileInputFormat.class.getName();
+ public static final String CLASS_NAME_RC_INPUT_FORMAT = RCFileInputFormat.class.getName();
+ public static final String CLASS_NAME_HDFS_FILESYSTEM = DistributedFileSystem.class.getName();
+ /**
+ * input formats aliases
+ */
+ public static final String INPUT_FORMAT_TEXT = "text-input-format";
+ public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+ public static final String INPUT_FORMAT_RC = "rc-input-format";
+ /**
+ * Builtin streams
+ */
+
+ /**
+ * Builtin record readers
+ */
+ public static final String READER_HDFS = "hdfs";
+ public static final String READER_ADM = "adm";
+ public static final String READER_SEMISTRUCTURED = "semi-structured";
+ public static final String READER_DELIMITED = "delimited-text";
+
+ public static final String CLUSTER_LOCATIONS = "cluster-locations";
+ public static final String SCHEDULER = "hdfs-scheduler";
+ public static final String PARSER_HIVE = "hive-parser";
+ public static final String HAS_HEADER = "has.header";
+ public static final String TIME_TRACKING = "time.tracking";
+ public static final String DEFAULT_QUOTE = "\"";
+ public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
+ public static final String DEFAULT_DELIMITER = ",";
+ public static final String EXTERNAL_LIBRARY_SEPARATOR = "#";
+ public static final String HDFS_INDEXING_ADAPTER = "hdfs-indexing-adapter";
+ /**
+ * supported builtin record formats
+ */
+ public static final String FORMAT_HIVE = "hive";
+ public static final String FORMAT_BINARY = "binary";
+ public static final String FORMAT_ADM = "adm";
+ public static final String FORMAT_JSON = "json";
+ public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+ public static final String FORMAT_TWEET = "tweet";
+ public static final String FORMAT_RSS = "rss";
+
+ /**
+ * input streams
+ */
+ public static final String STREAM_HDFS = "hdfs";
+ public static final String STREAM_LOCAL_FILESYSTEM = "localfs";
+ public static final String STREAM_SOCKET = "socket";
+
+ /**
+ * adapter aliases
+ */
+ public static final String ALIAS_GENERIC_ADAPTER = "adapter";
+ public static final String ALIAS_LOCALFS_ADAPTER = "localfs";
+ public static final String ALIAS_HDFS_ADAPTER = "hdfs";
+ public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter";
+ public static final String ALIAS_TWITTER_FIREHOSE_ADAPTER = "twitter_firehose";
+ public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client";
+ public static final String ALIAS_RSS_ADAPTER = "rss_feed";
+ public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed";
+ public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter";
+ public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter";
+ public static final String ALIAS_TWITTER_AZURE_ADAPTER = "azure_twitter";
+ public static final String ALIAS_CNN_ADAPTER = "cnn_feed";
+
+ /**
+ * For backward compatability
+ */
+ public static final String ADAPTER_LOCALFS_CLASSNAME = "org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter";
+ public static final String ADAPTER_HDFS_CLASSNAME = "org.apache.asterix.external.dataset.adapter.HDFSAdapter";
+
+ /**
+ * Constant characters
+ */
+ public static final char ESCAPE = '\\';
+ public static final char QUOTE = '"';
+ public static final char SPACE = ' ';
+ public static final char TAB = '\t';
+ public static final char LF = '\n';
+ public static final char CR = '\r';
+ public static final char DEFAULT_RECORD_START = '{';
+ public static final char DEFAULT_RECORD_END = '}';
+
+ /**
+ * Constant byte characters
+ */
+ public static final byte EOL = '\n';
+ public static final byte BYTE_CR = '\r';
+ /**
+ * Size default values
+ */
+ public static final int DEFAULT_BUFFER_SIZE = 4096;
+ public static final int DEFAULT_BUFFER_INCREMENT = 4096;
+
+ /**
+ * Expected parameter values
+ */
+ public static final String PARAMETER_OF_SIZE_ONE = "Value of size 1";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
new file mode 100644
index 0000000..9dcaef4
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+public class ExternalDataExceptionUtils {
+ public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
+ public static final String MISSING_PARAMETER = "Missing parameter.\n";
+ public static final String PARAMETER_NAME = "Parameter name: ";
+ public static final String EXPECTED_VALUE = "Expected value: ";
+ public static final String PASSED_VALUE = "Passed value: ";
+
+ public static String incorrectParameterMessage(String parameterName, String expectedValue, String passedValue) {
+ return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + ExternalDataConstants.LF + EXPECTED_VALUE
+ + expectedValue + ExternalDataConstants.LF + PASSED_VALUE + passedValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
new file mode 100644
index 0000000..7c1c1b5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.feeds.FeedConstants;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+
+public class ExternalDataUtils {
+
+ // Get a delimiter from the given configuration
+ public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
+ String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
+ if (delimiterValue == null) {
+ delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
+ } else if (delimiterValue.length() != 1) {
+ throw new AsterixException(
+ "'" + delimiterValue + "' is not a valid delimiter. The length of a delimiter should be 1.");
+ }
+ return delimiterValue.charAt(0);
+ }
+
+ // Get a quote from the given configuration when the delimiter is given
+ // Need to pass delimiter to check whether they share the same character
+ public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
+ String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
+ if (quoteValue == null) {
+ quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
+ } else if (quoteValue.length() != 1) {
+ throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
+ }
+
+ // Since delimiter (char type value) can't be null,
+ // we only check whether delimiter and quote use the same character
+ if (quoteValue.charAt(0) == delimiter) {
+ throw new AsterixException(
+ "Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter + "'. ");
+ }
+
+ return quoteValue.charAt(0);
+ }
+
+ // Get the header flag
+ public static boolean getHasHeader(Map<String, String> configuration) {
+ return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_HEADER));
+ }
+
+ public static DataSourceType getDataSourceType(Map<String, String> configuration) throws AsterixException {
+ if (isDataSourceStreamProvider(configuration)) {
+ return DataSourceType.STREAM;
+ } else if (isDataSourceRecordReader(configuration)) {
+ return DataSourceType.RECORDS;
+ } else {
+ throw new AsterixException(
+ "unable to determine whether input is a stream provider or a record reader. parameters: "
+ + ExternalDataConstants.KEY_STREAM + " or " + ExternalDataConstants.KEY_READER
+ + " must be specified");
+ }
+ }
+
+ public static boolean isExternal(String aString) {
+ return (aString.contains(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) && aString.trim().length() > 1);
+ }
+
+ public static ClassLoader getClassLoader(String dataverse, String library) {
+ return ExternalLibraryManager.getLibraryClassLoader(dataverse, library);
+ }
+
+ public static String getLibraryName(String aString) {
+ return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
+ }
+
+ public static String getExternalClassName(String aString) {
+ return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[1];
+ }
+
+ public static IInputStreamProviderFactory createExternalInputStreamFactory(String dataverse, String stream)
+ throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+ String libraryName = getLibraryName(stream);
+ String className = getExternalClassName(stream);
+ ClassLoader classLoader = getClassLoader(dataverse, libraryName);
+ return ((IInputStreamProviderFactory) (classLoader.loadClass(className).newInstance()));
+ }
+
+ public static String getDataverse(Map<String, String> configuration) {
+ return configuration.get(ExternalDataConstants.KEY_DATAVERSE);
+ }
+
+ public static boolean isDataSourceStreamProvider(Map<String, String> configuration) {
+ return configuration.containsKey(ExternalDataConstants.KEY_STREAM);
+ }
+
+ private static boolean isDataSourceRecordReader(Map<String, String> configuration) {
+ return configuration.containsKey(ExternalDataConstants.KEY_READER);
+ }
+
+ public static String getRecordFormat(Map<String, String> configuration) {
+ String parserFormat = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
+ return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT);
+ }
+
+ private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
+
+ private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
+ Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
+ m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+ m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+ m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+ m.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+ m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+ return m;
+ }
+
+ public static IValueParserFactory[] getValueParserFactories(ARecordType recordType) {
+ int n = recordType.getFieldTypes().length;
+ IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+ for (int i = 0; i < n; i++) {
+ ATypeTag tag = null;
+ if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
+ List<IAType> unionTypes = ((AUnionType) recordType.getFieldTypes()[i]).getUnionList();
+ if (unionTypes.size() != 2 && unionTypes.get(0).getTypeTag() != ATypeTag.NULL) {
+ throw new NotImplementedException("Non-optional UNION type is not supported.");
+ }
+ tag = unionTypes.get(1).getTypeTag();
+ } else {
+ tag = recordType.getFieldTypes()[i].getTypeTag();
+ }
+ if (tag == null) {
+ throw new NotImplementedException("Failed to get the type information for field " + i + ".");
+ }
+ IValueParserFactory vpf = valueParserFactoryMap.get(tag);
+ if (vpf == null) {
+ throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
+ }
+ fieldParserFactories[i] = vpf;
+ }
+ return fieldParserFactories;
+ }
+
+ public static String getRecordReaderStreamName(Map<String, String> configuration) {
+ return configuration.get(ExternalDataConstants.KEY_READER_STREAM);
+ }
+
+ public static boolean hasHeader(Map<String, String> configuration) {
+ String value = configuration.get(ExternalDataConstants.KEY_HEADER);
+ if (value != null) {
+ return Boolean.valueOf(value);
+ }
+ return false;
+ }
+
+ public static boolean isPull(Map<String, String> configuration) {
+ String pull = configuration.get(ExternalDataConstants.KEY_PULL);
+ if (pull == null) {
+ return false;
+ }
+ return Boolean.parseBoolean(pull);
+ }
+
+ public static boolean isPush(Map<String, String> configuration) {
+ String push = configuration.get(ExternalDataConstants.KEY_PUSH);
+ if (push == null) {
+ return false;
+ }
+ return Boolean.parseBoolean(push);
+ }
+
+ public static IRecordReaderFactory<?> createExternalRecordReaderFactory(String dataverse, String reader)
+ throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+ String library = reader.substring(0, reader.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR));
+ ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverse, library);
+ return (IRecordReaderFactory<?>) classLoader
+ .loadClass(reader.substring(reader.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1))
+ .newInstance();
+ }
+
+ public static IDataParserFactory createExternalParserFactory(String dataverse, String parserFactoryName)
+ throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+ String library = parserFactoryName.substring(0,
+ parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR));
+ ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverse, library);
+ return (IDataParserFactory) classLoader
+ .loadClass(parserFactoryName
+ .substring(parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1))
+ .newInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
new file mode 100644
index 0000000..de6737a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.IndexingScheduler;
+import org.apache.asterix.external.indexing.RecordId.RecordIdType;
+import org.apache.asterix.external.input.stream.HDFSInputStreamProvider;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.ICCContext;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.hdfs.scheduler.Scheduler;
+
+public class HDFSUtils {
+
+ public static Scheduler initializeHDFSScheduler() {
+ ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+ Scheduler scheduler = null;
+ try {
+ scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
+ ccContext.getClusterControllerInfo().getClientNetPort());
+ } catch (HyracksException e) {
+ throw new IllegalStateException("Cannot obtain hdfs scheduler");
+ }
+ return scheduler;
+ }
+
+ public static IndexingScheduler initializeIndexingHDFSScheduler() {
+ ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+ IndexingScheduler scheduler = null;
+ try {
+ scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
+ ccContext.getClusterControllerInfo().getClientNetPort());
+ } catch (HyracksException e) {
+ throw new IllegalStateException("Cannot obtain hdfs scheduler");
+ }
+ return scheduler;
+ }
+
+ /**
+ * Instead of creating the split using the input format, we do it manually
+ * This function returns fileSplits (1 per hdfs file block) irrespective of the number of partitions
+ * and the produced splits only cover intersection between current files in hdfs and files stored internally
+ * in AsterixDB
+ * 1. NoOp means appended file
+ * 2. AddOp means new file
+ * 3. UpdateOp means the delta of a file
+ * @return
+ * @throws IOException
+ */
+ public static InputSplit[] getSplits(JobConf conf, List<ExternalFile> files) throws IOException {
+ // Create file system object
+ FileSystem fs = FileSystem.get(conf);
+ ArrayList<FileSplit> fileSplits = new ArrayList<FileSplit>();
+ ArrayList<ExternalFile> orderedExternalFiles = new ArrayList<ExternalFile>();
+ // Create files splits
+ for (ExternalFile file : files) {
+ Path filePath = new Path(file.getFileName());
+ FileStatus fileStatus;
+ try {
+ fileStatus = fs.getFileStatus(filePath);
+ } catch (FileNotFoundException e) {
+ // file was deleted at some point, skip to next file
+ continue;
+ }
+ if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
+ && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+ // Get its information from HDFS name node
+ BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
+ // Create a split per block
+ for (BlockLocation block : fileBlocks) {
+ if (block.getOffset() < file.getSize()) {
+ fileSplits
+ .add(new FileSplit(filePath,
+ block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
+ ? block.getLength() : (file.getSize() - block.getOffset()),
+ block.getHosts()));
+ orderedExternalFiles.add(file);
+ }
+ }
+ } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
+ && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+ long oldSize = 0L;
+ long newSize = file.getSize();
+ for (int i = 0; i < files.size(); i++) {
+ if (files.get(i).getFileName() == file.getFileName() && files.get(i).getSize() != file.getSize()) {
+ newSize = files.get(i).getSize();
+ oldSize = file.getSize();
+ break;
+ }
+ }
+
+ // Get its information from HDFS name node
+ BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
+ // Create a split per block
+ for (BlockLocation block : fileBlocks) {
+ if (block.getOffset() + block.getLength() > oldSize) {
+ if (block.getOffset() < newSize) {
+ // Block interact with delta -> Create a split
+ long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
+ long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
+ : block.getOffset() + block.getLength() - newSize;
+ long splitLength = block.getLength() - startCut - endCut;
+ fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+ block.getHosts()));
+ orderedExternalFiles.add(file);
+ }
+ }
+ }
+ }
+ }
+ fs.close();
+ files.clear();
+ files.addAll(orderedExternalFiles);
+ return fileSplits.toArray(new FileSplit[fileSplits.size()]);
+ }
+
+ public static String getInputFormatClassName(Map<String, String> configuration) {
+ String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+ switch (inputFormatParameter) {
+ case ExternalDataConstants.INPUT_FORMAT_TEXT:
+ return ExternalDataConstants.CLASS_NAME_TEXT_INPUT_FORMAT;
+ case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
+ return ExternalDataConstants.CLASS_NAME_SEQUENCE_INPUT_FORMAT;
+ case ExternalDataConstants.INPUT_FORMAT_RC:
+ return ExternalDataConstants.CLASS_NAME_RC_INPUT_FORMAT;
+ default:
+ return inputFormatParameter;
+ }
+ }
+
+ public static Class<?> getInputFormatClass(Map<String, String> configuration) throws ClassNotFoundException {
+ String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+ switch (inputFormatParameter) {
+ case ExternalDataConstants.INPUT_FORMAT_TEXT:
+ return TextInputFormat.class;
+ case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
+ return SequenceFileInputFormat.class;
+ case ExternalDataConstants.INPUT_FORMAT_RC:
+ return RCFileInputFormat.class;
+ default:
+ return Class.forName(inputFormatParameter);
+ }
+ }
+
+ public static JobConf configureHDFSJobConf(Map<String, String> configuration) throws Exception {
+ JobConf conf = new JobConf();
+
+ String localShortCircuitSocketPath = configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH);
+ String formatClassName = HDFSUtils.getInputFormatClassName(configuration);
+ conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
+ configuration.get(ExternalDataConstants.KEY_HDFS_URL).trim());
+ conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, ExternalDataConstants.CLASS_NAME_HDFS_FILESYSTEM);
+ conf.setClassLoader(HDFSInputStreamProvider.class.getClassLoader());
+ conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, configuration.get(ExternalDataConstants.KEY_PATH).trim());
+ conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT, formatClassName);
+
+ // Enable local short circuit reads if user supplied the parameters
+ if (localShortCircuitSocketPath != null) {
+ conf.set(ExternalDataConstants.KEY_HADOOP_SHORT_CIRCUIT, "true");
+ conf.set(ExternalDataConstants.KEY_HADOOP_SOCKET_PATH, localShortCircuitSocketPath.trim());
+ }
+ return conf;
+ }
+
+ public static AlgebricksPartitionConstraint getPartitionConstraints(
+ AlgebricksPartitionConstraint clusterLocations) {
+ if (clusterLocations == null) {
+ ArrayList<String> locs = new ArrayList<String>();
+ Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
+ for (String i : stores.keySet()) {
+ int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
+ for (int k = 0; k < numIODevices; k++) {
+ locs.add(i);
+ }
+ }
+ String[] cluster = new String[locs.size()];
+ cluster = locs.toArray(cluster);
+ clusterLocations = new AlgebricksAbsolutePartitionConstraint(cluster);
+ }
+ return clusterLocations;
+ }
+
+ public static RecordIdType getRecordIdType(Map<String, String> configuration) {
+ String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+ switch (inputFormatParameter) {
+ case ExternalDataConstants.INPUT_FORMAT_TEXT:
+ case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
+ return RecordIdType.OFFSET;
+ case ExternalDataConstants.INPUT_FORMAT_RC:
+ return RecordIdType.RC;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolver.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolver.java
deleted file mode 100644
index 3a92b97..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolver.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-
-/**
- * A policy for resolving a name to a node controller id.
- */
-public interface INodeResolver {
-
- /**
- * Resolve a passed-in value to a node controller id.
- *
- * @param value
- * string to be resolved
- * @return resolved result (a node controller id)
- * @throws AsterixException
- */
- public String resolveNode(String value) throws AsterixException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolverFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolverFactory.java
deleted file mode 100644
index b3c459b..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolverFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util;
-
-/**
- * Factory for creating an instance of INodeResolver
- *
- * @see INodeResolver
- */
-public interface INodeResolverFactory {
-
- /**
- * Create an instance of {@link INodeResolver}
- *
- * @return an instance of INodeResolver
- */
- public INodeResolver createNodeResolver();
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentitiyResolverFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentitiyResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentitiyResolverFactory.java
index 776061f..582189a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentitiyResolverFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentitiyResolverFactory.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.external.util;
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.external.api.INodeResolverFactory;
+
/**
* Factory for creating an instance of @see {IdentityResolver}.
* Identity resolver simply resolves a value to itself and is useful when value being resolved
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
index 2b792b2..bda5f1e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.util;
+import org.apache.asterix.external.api.INodeResolver;
+
/**
* Identity resolver simply resolves a value to itself and is useful when value being resolved
* is a node controller id.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
new file mode 100644
index 0000000..61764d7
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.om.util.AsterixRuntimeUtil;
+
+/**
+ * Resolves a value (DNS/IP Address) or a (Node Controller Id) to the id of a Node Controller running at the location.
+ */
+public class NodeResolver implements INodeResolver {
+ //TODO: change this call and replace by calling AsterixClusterProperties
+ private static final Random random = new Random();
+ private static final Map<InetAddress, Set<String>> ncMap = new HashMap<InetAddress, Set<String>>();
+ private static final Set<String> ncs = new HashSet<String>();
+
+ @Override
+ public String resolveNode(String value) throws AsterixException {
+ UnknownHostException uhe = null;
+ try {
+ if (ncMap.isEmpty()) {
+ NodeResolver.updateNCs();
+ }
+ InetAddress ipAddress = null;
+ try {
+ ipAddress = InetAddress.getByName(value);
+ } catch (UnknownHostException e) {
+ uhe = e;
+ }
+ if (ipAddress == null) {
+ if (ncs.contains(value)) {
+ return value;
+ } else {
+ NodeResolver.updateNCs();
+ if (ncs.contains(value)) {
+ return value;
+ } else {
+ throw new AsterixException("address passed: '" + value
+ + "' couldn't be resolved to an ip address and is not an NC id. Existing NCs are "
+ + ncs.toString(), uhe);
+ }
+ }
+
+ }
+ Set<String> nodeControllers = ncMap.get(ipAddress);
+ if (nodeControllers == null || nodeControllers.isEmpty()) {
+ throw new AsterixException(" No node controllers found at the address: " + value);
+ }
+ String chosenNCId = nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())];
+ return chosenNCId;
+ } catch (UnknownHostException e) {
+ throw new AsterixException("Unable to resolve hostname '" + value + "' to an IP address");
+ } catch (AsterixException ae) {
+ throw ae;
+ } catch (Exception e) {
+ throw new AsterixException(e);
+ }
+ }
+
+ private static void updateNCs() throws Exception {
+ synchronized (ncMap) {
+ ncMap.clear();
+ AsterixRuntimeUtil.getNodeControllerMap(ncMap);
+ synchronized (ncs) {
+ ncs.clear();
+ for (Entry<InetAddress, Set<String>> entry : ncMap.entrySet()) {
+ ncs.addAll(entry.getValue());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/resources/adm.grammar
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/resources/adm.grammar b/asterix-external-data/src/main/resources/adm.grammar
new file mode 100644
index 0000000..1910436
--- /dev/null
+++ b/asterix-external-data/src/main/resources/adm.grammar
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# LEXER GENERATOR configuration file
+# ---------------------------------------
+# Place *first* the generic configuration
+# then list your grammar.
+
+PACKAGE: org.apache.asterix.runtime.operators.file.adm
+LEXER_NAME: AdmLexer
+
+TOKENS:
+
+BOOLEAN_CONS = string(boolean)
+INT8_CONS = string(int8)
+INT16_CONS = string(int16)
+INT32_CONS = string(int32)
+INT64_CONS = string(int64)
+INT64_CONS = string(int)
+FLOAT_CONS = string(float)
+DOUBLE_CONS = string(double)
+DATE_CONS = string(date)
+DATETIME_CONS = string(datetime)
+DURATION_CONS = string(duration)
+STRING_CONS = string(string)
+HEX_CONS = string(hex)
+BASE64_CONS = string(base64)
+POINT_CONS = string(point)
+POINT3D_CONS = string(point3d)
+LINE_CONS = string(line)
+POLYGON_CONS = string(polygon)
+RECTANGLE_CONS = string(rectangle)
+CIRCLE_CONS = string(circle)
+TIME_CONS = string(time)
+INTERVAL_TIME_CONS = string(interval-time)
+INTERVAL_DATE_CONS = string(interval-date)
+INTERVAL_DATETIME_CONS = string(interval-datetime)
+YEAR_MONTH_DURATION_CONS = string(year-month-duration)
+DAY_TIME_DURATION_CONS = string(day-time-duration)
+UUID_CONS = string(uuid)
+
+NULL_LITERAL = string(null)
+TRUE_LITERAL = string(true)
+FALSE_LITERAL = string(false)
+
+CONSTRUCTOR_OPEN = char(()
+CONSTRUCTOR_CLOSE = char())
+START_RECORD = char({)
+END_RECORD = char(})
+COMMA = char(\,)
+COLON = char(:)
+START_ORDERED_LIST = char([)
+END_ORDERED_LIST = char(])
+START_UNORDERED_LIST = string({{)
+# END_UNORDERED_LIST = }} is recognized as a double END_RECORD token
+
+STRING_LITERAL = char("), anythingUntil(")
+
+INT_LITERAL = signOrNothing(), digitSequence()
+INT8_LITERAL = token(INT_LITERAL), string(i8)
+INT16_LITERAL = token(INT_LITERAL), string(i16)
+INT32_LITERAL = token(INT_LITERAL), string(i32)
+INT64_LITERAL = token(INT_LITERAL), string(i64)
+
+@EXPONENT = caseInsensitiveChar(e), signOrNothing(), digitSequence()
+
+DOUBLE_LITERAL = signOrNothing(), char(.), digitSequence()
+DOUBLE_LITERAL = signOrNothing(), digitSequence(), char(.), digitSequence()
+DOUBLE_LITERAL = signOrNothing(), digitSequence(), char(.), digitSequence(), token(@EXPONENT)
+DOUBLE_LITERAL = signOrNothing(), digitSequence(), token(@EXPONENT)
+
+FLOAT_LITERAL = token(DOUBLE_LITERAL), caseInsensitiveChar(f)
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFactory.java
index 2c1f02a..db693a1 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFactory.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.external.library;
-import org.apache.asterix.external.library.IExternalScalarFunction;
-import org.apache.asterix.external.library.IFunctionFactory;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
public class AddHashTagsFactory implements IFunctionFactory {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
index bca508f..db717e6 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
@@ -23,6 +23,8 @@ import org.apache.asterix.external.library.java.JObjects.JPoint;
import org.apache.asterix.external.library.java.JObjects.JRecord;
import org.apache.asterix.external.library.java.JObjects.JString;
import org.apache.asterix.external.library.java.JObjects.JUnorderedList;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
import org.apache.asterix.external.library.java.JTypeTag;
import org.apache.asterix.external.util.Datatypes;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFactory.java
index aec9e5d..a13da84 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFactory.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.library;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
public class AddHashTagsInPlaceFactory implements IFunctionFactory {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFunction.java
index 2765225..399f0f9 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFunction.java
@@ -21,6 +21,8 @@ package org.apache.asterix.external.library;
import org.apache.asterix.external.library.java.JObjects.JRecord;
import org.apache.asterix.external.library.java.JObjects.JString;
import org.apache.asterix.external.library.java.JObjects.JUnorderedList;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
import org.apache.asterix.external.library.java.JTypeTag;
import org.apache.asterix.external.util.Datatypes;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFactory.java
index dc0ab7a..9050462 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFactory.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.external.library;
-import org.apache.asterix.external.library.IExternalFunction;
-import org.apache.asterix.external.library.IFunctionFactory;
+import org.apache.asterix.external.api.IExternalFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
public class AllTypesFactory implements IFunctionFactory {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFunction.java
index 12ce871..8f65bee 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFunction.java
@@ -35,6 +35,8 @@ import org.apache.asterix.external.library.java.JObjects.JRecord;
import org.apache.asterix.external.library.java.JObjects.JString;
import org.apache.asterix.external.library.java.JObjects.JTime;
import org.apache.asterix.external.library.java.JObjects.JUnorderedList;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
import org.apache.asterix.external.library.java.JTypeTag;
public class AllTypesFunction implements IExternalScalarFunction {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFactory.java
index 7d1d3da..e15cb3d 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFactory.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.external.library;
-import org.apache.asterix.external.library.IExternalScalarFunction;
-import org.apache.asterix.external.library.IFunctionFactory;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
public class CapitalFinderFactory implements IFunctionFactory {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFunction.java
index 21467af..969e109 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFunction.java
@@ -23,6 +23,8 @@ import java.util.Properties;
import org.apache.asterix.external.library.java.JObjects.JRecord;
import org.apache.asterix.external.library.java.JObjects.JString;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
import org.apache.asterix.external.library.java.JTypeTag;
public class CapitalFinderFunction implements IExternalScalarFunction {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFactory.java
index 21ad776..5d8126b 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFactory.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.library;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
public class EchoDelayFactory implements IFunctionFactory {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFunction.java
index e564ca0..c115ac4 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFunction.java
@@ -20,6 +20,8 @@ package org.apache.asterix.external.library;
import java.util.Random;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
import org.apache.asterix.external.library.java.JObjects.JRecord;
public class EchoDelayFunction implements IExternalScalarFunction {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFactory.java
index db3a5fa..5515ebd 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFactory.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.external.library;
-import org.apache.asterix.external.library.IExternalScalarFunction;
-import org.apache.asterix.external.library.IFunctionFactory;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
public class ParseTweetFactory implements IFunctionFactory {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFunction.java
index caa0544..b9c736a 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFunction.java
@@ -21,6 +21,8 @@ package org.apache.asterix.external.library;
import org.apache.asterix.external.library.java.JObjects.JRecord;
import org.apache.asterix.external.library.java.JObjects.JString;
import org.apache.asterix.external.library.java.JObjects.JUnorderedList;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
import org.apache.asterix.external.library.java.JTypeTag;
public class ParseTweetFunction implements IExternalScalarFunction {