You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:05 UTC

[07/21] incubator-asterixdb git commit: First stage of external data cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
new file mode 100644
index 0000000..db38c12
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.runtime;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+public class SocketClientAdapter implements IDataSourceAdapter {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(SocketClientAdapter.class.getName());
+
+    private static final String LOCALHOST = "127.0.0.1";
+
+    private static final long RECONNECT_PERIOD = 2000;
+
+    private final String localFile;
+
+    private final int port;
+
+    private boolean continueStreaming = true;
+
+    public SocketClientAdapter(Integer port, String localFile) {
+        this.localFile = localFile;
+        this.port = port;
+    }
+
+    @Override
+    public void start(int partition, IFrameWriter writer) throws Exception {
+        Socket socket = waitForReceiver();
+        OutputStream os = socket.getOutputStream();
+        FileInputStream fin = new FileInputStream(new File(localFile));
+        byte[] chunk = new byte[1024];
+        int read;
+        try {
+            while (continueStreaming) {
+                read = fin.read(chunk);
+                if (read > 0) {
+                    os.write(chunk, 0, read);
+                } else {
+                    break;
+                }
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Finished streaming file " + localFile + "to port [" + port + "]");
+            }
+
+        } finally {
+            socket.close();
+            fin.close();
+        }
+
+    }
+
+    private Socket waitForReceiver() throws Exception {
+        Socket socket = null;
+        while (socket == null) {
+            try {
+                socket = new Socket(LOCALHOST, port);
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Receiver not ready, would wait for " + (RECONNECT_PERIOD / 1000)
+                            + " seconds before reconnecting");
+                }
+                Thread.sleep(RECONNECT_PERIOD);
+            }
+        }
+        return socket;
+    }
+
+    @Override
+    public boolean stop() throws Exception {
+        continueStreaming = false;
+        return true;
+    }
+
+    @Override
+    public boolean handleException(Throwable e) {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
new file mode 100644
index 0000000..a1e90a8
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.runtime;
+
+import java.util.Map;
+
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class SocketClientAdapterFactory implements IAdapterFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private ARecordType outputType;
+
+    private GenericSocketFeedAdapterFactory genericSocketAdapterFactory;
+
+    private String[] fileSplits;
+
+    public static final String KEY_FILE_SPLITS = "file_splits";
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+        this.outputType = outputType;
+        String fileSplitsValue = configuration.get(KEY_FILE_SPLITS);
+        if (fileSplitsValue == null) {
+            throw new IllegalArgumentException(
+                    "File splits not specified. File split is specified as a comma separated list of paths");
+        }
+        fileSplits = fileSplitsValue.trim().split(",");
+        genericSocketAdapterFactory = new GenericSocketFeedAdapterFactory();
+        genericSocketAdapterFactory.configure(configuration, outputType);
+    }
+
+    @Override
+    public String getAlias() {
+        return ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return genericSocketAdapterFactory.getPartitionConstraint();
+    }
+
+    @Override
+    public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+        Pair<String, Integer> socket = genericSocketAdapterFactory.getSockets().get(partition);
+        return new SocketClientAdapter(socket.second, fileSplits[partition]);
+    }
+
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return outputType;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
new file mode 100644
index 0000000..b5fd454
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.runtime;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.runtime.DataGenerator.InitializationInfo;
+import org.apache.asterix.external.runtime.DataGenerator.TweetMessage;
+import org.apache.asterix.external.runtime.DataGenerator.TweetMessageIterator;
+
+public class TweetGenerator {
+
+    private static Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
+
+    public static final String KEY_DURATION = "duration";
+    public static final String KEY_TPS = "tps";
+    public static final String KEY_VERBOSE = "verbose";
+    public static final String KEY_FIELDS = "fields";
+    public static final int INFINITY = 0;
+
+    private static final int DEFAULT_DURATION = INFINITY;
+
+    private int duration;
+    private TweetMessageIterator tweetIterator = null;
+    private int partition;
+    private long tweetCount = 0;
+    private int frameTweetCount = 0;
+    private int numFlushedTweets = 0;
+    private DataGenerator dataGenerator = null;
+    private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
+    private String[] fields;
+    private final List<OutputStream> subscribers;
+    private final Object lock = new Object();
+    private final List<OutputStream> subscribersForRemoval = new ArrayList<OutputStream>();
+
+    public TweetGenerator(Map<String, String> configuration, int partition) throws Exception {
+        this.partition = partition;
+        String value = configuration.get(KEY_DURATION);
+        this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
+        dataGenerator = new DataGenerator(new InitializationInfo());
+        tweetIterator = dataGenerator.new TweetMessageIterator(duration);
+        this.fields = configuration.get(KEY_FIELDS) != null ? configuration.get(KEY_FIELDS).split(",") : null;
+        this.subscribers = new ArrayList<OutputStream>();
+    }
+
+    private void writeTweetString(TweetMessage tweetMessage) throws IOException {
+        String tweet = tweetMessage.getAdmEquivalent(fields) + "\n";
+        System.out.println(tweet);
+        tweetCount++;
+        byte[] b = tweet.getBytes();
+        if (outputBuffer.position() + b.length > outputBuffer.limit()) {
+            flush();
+            numFlushedTweets += frameTweetCount;
+            frameTweetCount = 0;
+            outputBuffer.put(b);
+        } else {
+            outputBuffer.put(b);
+        }
+        frameTweetCount++;
+    }
+
+    private void flush() throws IOException {
+        outputBuffer.flip();
+        synchronized (lock) {
+            for (OutputStream os : subscribers) {
+                try {
+                    os.write(outputBuffer.array(), 0, outputBuffer.limit());
+                } catch (Exception e) {
+                    subscribersForRemoval.add(os);
+                }
+            }
+            if (!subscribersForRemoval.isEmpty()) {
+                subscribers.removeAll(subscribersForRemoval);
+                subscribersForRemoval.clear();
+            }
+        }
+        outputBuffer.position(0);
+        outputBuffer.limit(32 * 1024);
+    }
+
+    public boolean generateNextBatch(int numTweets) throws Exception {
+        boolean moreData = tweetIterator.hasNext();
+        if (!moreData) {
+            if (outputBuffer.position() > 0) {
+                flush();
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount);
+            }
+            return false;
+        } else {
+            int count = 0;
+            while (count < numTweets) {
+                writeTweetString(tweetIterator.next());
+                count++;
+            }
+            return true;
+        }
+    }
+
+    public int getNumFlushedTweets() {
+        return numFlushedTweets;
+    }
+
+    public void registerSubscriber(OutputStream os) {
+        synchronized (lock) {
+            subscribers.add(os);
+        }
+    }
+
+    public void deregisterSubscribers(OutputStream os) {
+        synchronized (lock) {
+            subscribers.remove(os);
+        }
+    }
+
+    public void close() throws IOException {
+        synchronized (lock) {
+            for (OutputStream os : subscribers) {
+                os.close();
+            }
+        }
+    }
+
+    public boolean isSubscribed() {
+        return !subscribers.isEmpty();
+    }
+
+    public long getTweetCount() {
+        return tweetCount;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolver.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolver.java
deleted file mode 100644
index a897294..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolver.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.om.util.AsterixRuntimeUtil;
-
-/**
- * Resolves a value (DNS/IP Address) to the id of a Node Controller running at the location.
- */
-public class DNSResolver implements INodeResolver {
-
-    private static Random random = new Random();
-
-    @Override
-    public String resolveNode(String value) throws AsterixException {
-        try {
-            InetAddress ipAddress = InetAddress.getByName(value);
-            Set<String> nodeControllers = AsterixRuntimeUtil.getNodeControllersOnIP(ipAddress);
-            if (nodeControllers == null || nodeControllers.isEmpty()) {
-                throw new AsterixException(" No node controllers found at the address: " + value);
-            }
-            String chosenNCId = nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())];
-            return chosenNCId;
-        }catch (UnknownHostException e){
-            throw new AsterixException("Unable to resolve hostname '"+ value + "' to an IP address");
-        } catch (AsterixException ae) {
-            throw ae;
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
index 6862d7a..f8585bb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
@@ -18,12 +18,15 @@
  */
 package org.apache.asterix.external.util;
 
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.external.api.INodeResolverFactory;
+
 /**
- * Factory for creating instance of {@link DNSResolver}
+ * Factory for creating instance of {@link NodeResolver}
  */
 public class DNSResolverFactory implements INodeResolverFactory {
 
-    private static final INodeResolver INSTANCE = new DNSResolver();
+    private static final INodeResolver INSTANCE = new NodeResolver();
 
     @Override
     public INodeResolver createNodeResolver() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
new file mode 100644
index 0000000..ea13f25
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.asterix.common.parse.ITupleForwarder.TupleForwardPolicy;
+import org.apache.asterix.external.dataflow.CounterTimerTupleForwarder;
+import org.apache.asterix.external.dataflow.FrameFullTupleForwarder;
+import org.apache.asterix.external.dataflow.RateControlledTupleForwarder;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class DataflowUtils {
+    public static void addTupleToFrame(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
+            throws HyracksDataException {
+        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            appender.flush(writer, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    public static ITupleForwarder getTupleForwarder(Map<String, String> configuration) throws AsterixException {
+        ITupleForwarder policy = null;
+        ITupleForwarder.TupleForwardPolicy policyType = null;
+        String propValue = configuration.get(ITupleForwarder.FORWARD_POLICY);
+        if (propValue == null) {
+            policyType = TupleForwardPolicy.FRAME_FULL;
+        } else {
+            policyType = TupleForwardPolicy.valueOf(propValue.trim().toUpperCase());
+        }
+        switch (policyType) {
+            case FRAME_FULL:
+                policy = new FrameFullTupleForwarder();
+                break;
+            case COUNTER_TIMER_EXPIRED:
+                policy = new CounterTimerTupleForwarder();
+                break;
+            case RATE_CONTROLLED:
+                policy = new RateControlledTupleForwarder();
+                break;
+            default:
+                throw new AsterixException("Unknown tuple forward policy");
+        }
+        return policy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
new file mode 100644
index 0000000..7f91a2b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.om.types.ARecordType;
+
+public class ExternalDataCompatibilityUtils {
+
+    public static void validateCompatibility(IExternalDataSourceFactory dataSourceFactory,
+            IDataParserFactory dataParserFactory) throws AsterixException {
+        if (dataSourceFactory.getDataSourceType() != dataParserFactory.getDataSourceType()) {
+            throw new AsterixException(
+                    "datasource-parser mismatch. datasource produces " + dataSourceFactory.getDataSourceType()
+                            + " and parser expects " + dataParserFactory.getDataSourceType());
+        }
+        if (dataSourceFactory.getDataSourceType() == DataSourceType.RECORDS) {
+            IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory;
+            IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
+            if (!recordParserFactory.getRecordClass().isAssignableFrom(recordReaderFactory.getRecordClass())) {
+                throw new AsterixException("datasource-parser mismatch. datasource produces records of type "
+                        + recordReaderFactory.getRecordClass() + " and parser expects records of type "
+                        + recordParserFactory.getRecordClass());
+            }
+        }
+    }
+
+    //TODO:Add remaining aliases
+    public static void addCompatabilityParameters(String adapterClassname, ARecordType itemType,
+            Map<String, String> configuration) throws AsterixException {
+        if (adapterClassname.equals(ExternalDataConstants.ALIAS_HDFS_ADAPTER)
+                || adapterClassname.equalsIgnoreCase(ExternalDataConstants.ADAPTER_HDFS_CLASSNAME)) {
+            if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+                throw new AsterixException("Unspecified format parameter for HDFS adapter");
+            }
+            if (configuration.get(ExternalDataConstants.KEY_FORMAT).equals(ExternalDataConstants.FORMAT_BINARY)
+                    || configuration.get(ExternalDataConstants.KEY_FORMAT).equals(ExternalDataConstants.FORMAT_HIVE)) {
+                configuration.put(ExternalDataConstants.KEY_READER, ExternalDataConstants.READER_HDFS);
+            } else {
+                configuration.put(ExternalDataConstants.KEY_READER,
+                        configuration.get(ExternalDataConstants.KEY_FORMAT));
+                configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.ALIAS_HDFS_ADAPTER);
+            }
+        }
+        if (adapterClassname.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER)
+                || adapterClassname.contains(ExternalDataConstants.ADAPTER_LOCALFS_CLASSNAME)) {
+            if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+                throw new AsterixException("Unspecified format parameter for local file system adapter");
+            }
+            configuration.put(ExternalDataConstants.KEY_READER, configuration.get(ExternalDataConstants.KEY_FORMAT));
+            configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.ALIAS_LOCALFS_ADAPTER);
+        }
+        if (configuration.get(ExternalDataConstants.KEY_PARSER) != null
+                && configuration.get(ExternalDataConstants.KEY_PARSER).equals(ExternalDataConstants.PARSER_HIVE)) {
+            configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_HIVE);
+        }
+        if (configuration.get(ExternalDataConstants.KEY_FILESYSTEM) != null) {
+            configuration.put(ExternalDataConstants.KEY_STREAM,
+                    configuration.get(ExternalDataConstants.KEY_FILESYSTEM));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
new file mode 100644
index 0000000..2050e6a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+public class ExternalDataConstants {
+    //TODO: Remove unused variables.
+    /**
+     * Keys
+     */
+    // used to specify the stream factory for an adapter that has a stream data source
+    public static final String KEY_STREAM = "stream";
+    // used to specify the dataverse of the adapter
+    public static final String KEY_DATAVERSE = "dataverse";
+    // used to specify the socket addresses when reading data from sockets
+    public static final String KEY_SOCKETS = "sockets";
+    // specify whether the socket address points to an NC or an IP
+    public static final String KEY_MODE = "address-type";
+    // specify the hdfs name node address when reading hdfs data
+    public static final String KEY_HDFS_URL = "hdfs";
+    // specify the path when reading from a file system
+    public static final String KEY_PATH = "path";
+    public static final String KEY_INPUT_FORMAT = "input-format";
+    public static final String KEY_FILESYSTEM = "fs";
+    public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
+    public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
+    public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
+    public static final String KEY_HADOOP_INPUT_FORMAT = "mapred.input.format.class";
+    public static final String KEY_HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
+    public static final String KEY_HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
+    public static final String KEY_HADOOP_BUFFER_SIZE = "io.file.buffer.size";
+    public static final String KEY_SOURCE_DATATYPE = "type-name";
+    public static final String KEY_DELIMITER = "delimiter";
+    public static final String KEY_PARSER_FACTORY = "tuple-parser";
+    public static final String KEY_DATA_PARSER = "parser";
+    public static final String KEY_HEADER = "header";
+    public static final String KEY_READER = "reader";
+    public static final String KEY_READER_STREAM = "reader-stream";
+    public static final String KEY_TYPE_NAME = "type-name";
+    public static final String KEY_RECORD_START = "record-start";
+    public static final String KEY_RECORD_END = "record-end";
+    public static final String KEY_EXPRESSION = "expression";
+    public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
+    public static final String KEY_FORMAT = "format";
+    public static final String KEY_QUOTE = "quote";
+    public static final String KEY_PARSER = "parser";
+    public static final String KEY_DATASET_RECORD = "dataset-record";
+    public static final String KEY_HIVE_SERDE = "hive-serde";
+    public static final String KEY_RSS_URL = "url";
+    public static final String KEY_INTERVAL = "interval";
+    public static final String KEY_PULL = "pull";
+    public static final String KEY_PUSH = "push";
+    /**
+     * HDFS class names
+     */
+    public static final String CLASS_NAME_TEXT_INPUT_FORMAT = TextInputFormat.class.getName();
+    public static final String CLASS_NAME_SEQUENCE_INPUT_FORMAT = SequenceFileInputFormat.class.getName();
+    public static final String CLASS_NAME_RC_INPUT_FORMAT = RCFileInputFormat.class.getName();
+    public static final String CLASS_NAME_HDFS_FILESYSTEM = DistributedFileSystem.class.getName();
+    /**
+     * input formats aliases
+     */
+    public static final String INPUT_FORMAT_TEXT = "text-input-format";
+    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+    public static final String INPUT_FORMAT_RC = "rc-input-format";
+    /**
+     * Builtin streams
+     */
+
+    /**
+     * Builtin record readers
+     */
+    public static final String READER_HDFS = "hdfs";
+    public static final String READER_ADM = "adm";
+    public static final String READER_SEMISTRUCTURED = "semi-structured";
+    public static final String READER_DELIMITED = "delimited-text";
+
+    public static final String CLUSTER_LOCATIONS = "cluster-locations";
+    public static final String SCHEDULER = "hdfs-scheduler";
+    public static final String PARSER_HIVE = "hive-parser";
+    public static final String HAS_HEADER = "has.header";
+    public static final String TIME_TRACKING = "time.tracking";
+    public static final String DEFAULT_QUOTE = "\"";
+    public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
+    public static final String DEFAULT_DELIMITER = ",";
+    public static final String EXTERNAL_LIBRARY_SEPARATOR = "#";
+    public static final String HDFS_INDEXING_ADAPTER = "hdfs-indexing-adapter";
+    /**
+     * supported builtin record formats
+     */
+    public static final String FORMAT_HIVE = "hive";
+    public static final String FORMAT_BINARY = "binary";
+    public static final String FORMAT_ADM = "adm";
+    public static final String FORMAT_JSON = "json";
+    public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+    public static final String FORMAT_TWEET = "tweet";
+    public static final String FORMAT_RSS = "rss";
+
+    /**
+     * input streams
+     */
+    public static final String STREAM_HDFS = "hdfs";
+    public static final String STREAM_LOCAL_FILESYSTEM = "localfs";
+    public static final String STREAM_SOCKET = "socket";
+
+    /**
+     * adapter aliases
+     */
+    public static final String ALIAS_GENERIC_ADAPTER = "adapter";
+    public static final String ALIAS_LOCALFS_ADAPTER = "localfs";
+    public static final String ALIAS_HDFS_ADAPTER = "hdfs";
+    public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter";
+    public static final String ALIAS_TWITTER_FIREHOSE_ADAPTER = "twitter_firehose";
+    public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client";
+    public static final String ALIAS_RSS_ADAPTER = "rss_feed";
+    public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed";
+    public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter";
+    public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter";
+    public static final String ALIAS_TWITTER_AZURE_ADAPTER = "azure_twitter";
+    public static final String ALIAS_CNN_ADAPTER = "cnn_feed";
+
+    /**
+     * For backward compatability
+     */
+    public static final String ADAPTER_LOCALFS_CLASSNAME = "org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter";
+    public static final String ADAPTER_HDFS_CLASSNAME = "org.apache.asterix.external.dataset.adapter.HDFSAdapter";
+
+    /**
+     * Constant characters
+     */
+    public static final char ESCAPE = '\\';
+    public static final char QUOTE = '"';
+    public static final char SPACE = ' ';
+    public static final char TAB = '\t';
+    public static final char LF = '\n';
+    public static final char CR = '\r';
+    public static final char DEFAULT_RECORD_START = '{';
+    public static final char DEFAULT_RECORD_END = '}';
+
+    /**
+     * Constant byte characters
+     */
+    public static final byte EOL = '\n';
+    public static final byte BYTE_CR = '\r';
+    /**
+     * Size default values
+     */
+    public static final int DEFAULT_BUFFER_SIZE = 4096;
+    public static final int DEFAULT_BUFFER_INCREMENT = 4096;
+
+    /**
+     * Expected parameter values
+     */
+    public static final String PARAMETER_OF_SIZE_ONE = "Value of size 1";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
new file mode 100644
index 0000000..9dcaef4
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+public class ExternalDataExceptionUtils {
+    public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
+    public static final String MISSING_PARAMETER = "Missing parameter.\n";
+    public static final String PARAMETER_NAME = "Parameter name: ";
+    public static final String EXPECTED_VALUE = "Expected value: ";
+    public static final String PASSED_VALUE = "Passed value: ";
+
+    public static String incorrectParameterMessage(String parameterName, String expectedValue, String passedValue) {
+        return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + ExternalDataConstants.LF + EXPECTED_VALUE
+                + expectedValue + ExternalDataConstants.LF + PASSED_VALUE + passedValue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
new file mode 100644
index 0000000..7c1c1b5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.feeds.FeedConstants;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+
+public class ExternalDataUtils {
+
+    // Get a delimiter from the given configuration
+    public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
+        String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
+        if (delimiterValue == null) {
+            delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
+        } else if (delimiterValue.length() != 1) {
+            throw new AsterixException(
+                    "'" + delimiterValue + "' is not a valid delimiter. The length of a delimiter should be 1.");
+        }
+        return delimiterValue.charAt(0);
+    }
+
+    // Get a quote from the given configuration when the delimiter is given
+    // Need to pass delimiter to check whether they share the same character
+    public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
+        String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
+        if (quoteValue == null) {
+            quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
+        } else if (quoteValue.length() != 1) {
+            throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
+        }
+
+        // Since delimiter (char type value) can't be null,
+        // we only check whether delimiter and quote use the same character
+        if (quoteValue.charAt(0) == delimiter) {
+            throw new AsterixException(
+                    "Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter + "'. ");
+        }
+
+        return quoteValue.charAt(0);
+    }
+
+    // Get the header flag
+    public static boolean getHasHeader(Map<String, String> configuration) {
+        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_HEADER));
+    }
+
+    public static DataSourceType getDataSourceType(Map<String, String> configuration) throws AsterixException {
+        if (isDataSourceStreamProvider(configuration)) {
+            return DataSourceType.STREAM;
+        } else if (isDataSourceRecordReader(configuration)) {
+            return DataSourceType.RECORDS;
+        } else {
+            throw new AsterixException(
+                    "unable to determine whether input is a stream provider or a record reader. parameters: "
+                            + ExternalDataConstants.KEY_STREAM + " or " + ExternalDataConstants.KEY_READER
+                            + " must be specified");
+        }
+    }
+
+    public static boolean isExternal(String aString) {
+        return (aString.contains(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) && aString.trim().length() > 1);
+    }
+
+    public static ClassLoader getClassLoader(String dataverse, String library) {
+        return ExternalLibraryManager.getLibraryClassLoader(dataverse, library);
+    }
+
+    public static String getLibraryName(String aString) {
+        return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
+    }
+
+    public static String getExternalClassName(String aString) {
+        return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[1];
+    }
+
+    public static IInputStreamProviderFactory createExternalInputStreamFactory(String dataverse, String stream)
+            throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+        String libraryName = getLibraryName(stream);
+        String className = getExternalClassName(stream);
+        ClassLoader classLoader = getClassLoader(dataverse, libraryName);
+        return ((IInputStreamProviderFactory) (classLoader.loadClass(className).newInstance()));
+    }
+
+    public static String getDataverse(Map<String, String> configuration) {
+        return configuration.get(ExternalDataConstants.KEY_DATAVERSE);
+    }
+
+    public static boolean isDataSourceStreamProvider(Map<String, String> configuration) {
+        return configuration.containsKey(ExternalDataConstants.KEY_STREAM);
+    }
+
+    private static boolean isDataSourceRecordReader(Map<String, String> configuration) {
+        return configuration.containsKey(ExternalDataConstants.KEY_READER);
+    }
+
+    public static String getRecordFormat(Map<String, String> configuration) {
+        String parserFormat = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
+        return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT);
+    }
+
+    private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
+
+    private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
+        Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
+        m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+        m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+        m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+        m.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+        m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+        return m;
+    }
+
+    public static IValueParserFactory[] getValueParserFactories(ARecordType recordType) {
+        int n = recordType.getFieldTypes().length;
+        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+        for (int i = 0; i < n; i++) {
+            ATypeTag tag = null;
+            if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
+                List<IAType> unionTypes = ((AUnionType) recordType.getFieldTypes()[i]).getUnionList();
+                if (unionTypes.size() != 2 && unionTypes.get(0).getTypeTag() != ATypeTag.NULL) {
+                    throw new NotImplementedException("Non-optional UNION type is not supported.");
+                }
+                tag = unionTypes.get(1).getTypeTag();
+            } else {
+                tag = recordType.getFieldTypes()[i].getTypeTag();
+            }
+            if (tag == null) {
+                throw new NotImplementedException("Failed to get the type information for field " + i + ".");
+            }
+            IValueParserFactory vpf = valueParserFactoryMap.get(tag);
+            if (vpf == null) {
+                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
+            }
+            fieldParserFactories[i] = vpf;
+        }
+        return fieldParserFactories;
+    }
+
+    public static String getRecordReaderStreamName(Map<String, String> configuration) {
+        return configuration.get(ExternalDataConstants.KEY_READER_STREAM);
+    }
+
+    public static boolean hasHeader(Map<String, String> configuration) {
+        String value = configuration.get(ExternalDataConstants.KEY_HEADER);
+        if (value != null) {
+            return Boolean.valueOf(value);
+        }
+        return false;
+    }
+
+    public static boolean isPull(Map<String, String> configuration) {
+        String pull = configuration.get(ExternalDataConstants.KEY_PULL);
+        if (pull == null) {
+            return false;
+        }
+        return Boolean.parseBoolean(pull);
+    }
+
+    public static boolean isPush(Map<String, String> configuration) {
+        String push = configuration.get(ExternalDataConstants.KEY_PUSH);
+        if (push == null) {
+            return false;
+        }
+        return Boolean.parseBoolean(push);
+    }
+
+    public static IRecordReaderFactory<?> createExternalRecordReaderFactory(String dataverse, String reader)
+            throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+        String library = reader.substring(0, reader.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR));
+        ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverse, library);
+        return (IRecordReaderFactory<?>) classLoader
+                .loadClass(reader.substring(reader.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1))
+                .newInstance();
+    }
+
+    public static IDataParserFactory createExternalParserFactory(String dataverse, String parserFactoryName)
+            throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+        String library = parserFactoryName.substring(0,
+                parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR));
+        ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverse, library);
+        return (IDataParserFactory) classLoader
+                .loadClass(parserFactoryName
+                        .substring(parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1))
+                .newInstance();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
new file mode 100644
index 0000000..de6737a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.IndexingScheduler;
+import org.apache.asterix.external.indexing.RecordId.RecordIdType;
+import org.apache.asterix.external.input.stream.HDFSInputStreamProvider;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.ICCContext;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.hdfs.scheduler.Scheduler;
+
+public class HDFSUtils {
+
+    public static Scheduler initializeHDFSScheduler() {
+        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+        Scheduler scheduler = null;
+        try {
+            scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
+                    ccContext.getClusterControllerInfo().getClientNetPort());
+        } catch (HyracksException e) {
+            throw new IllegalStateException("Cannot obtain hdfs scheduler");
+        }
+        return scheduler;
+    }
+
+    public static IndexingScheduler initializeIndexingHDFSScheduler() {
+        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+        IndexingScheduler scheduler = null;
+        try {
+            scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
+                    ccContext.getClusterControllerInfo().getClientNetPort());
+        } catch (HyracksException e) {
+            throw new IllegalStateException("Cannot obtain hdfs scheduler");
+        }
+        return scheduler;
+    }
+
+    /**
+     * Instead of creating the split using the input format, we do it manually
+     * This function returns fileSplits (1 per hdfs file block) irrespective of the number of partitions
+     * and the produced splits only cover intersection between current files in hdfs and files stored internally
+     * in AsterixDB
+     * 1. NoOp means appended file
+     * 2. AddOp means new file
+     * 3. UpdateOp means the delta of a file
+     * @return
+     * @throws IOException
+     */
+    public static InputSplit[] getSplits(JobConf conf, List<ExternalFile> files) throws IOException {
+        // Create file system object
+        FileSystem fs = FileSystem.get(conf);
+        ArrayList<FileSplit> fileSplits = new ArrayList<FileSplit>();
+        ArrayList<ExternalFile> orderedExternalFiles = new ArrayList<ExternalFile>();
+        // Create files splits
+        for (ExternalFile file : files) {
+            Path filePath = new Path(file.getFileName());
+            FileStatus fileStatus;
+            try {
+                fileStatus = fs.getFileStatus(filePath);
+            } catch (FileNotFoundException e) {
+                // file was deleted at some point, skip to next file
+                continue;
+            }
+            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
+                    && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+                // Get its information from HDFS name node
+                BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
+                // Create a split per block
+                for (BlockLocation block : fileBlocks) {
+                    if (block.getOffset() < file.getSize()) {
+                        fileSplits
+                                .add(new FileSplit(filePath,
+                                        block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
+                                                ? block.getLength() : (file.getSize() - block.getOffset()),
+                                block.getHosts()));
+                        orderedExternalFiles.add(file);
+                    }
+                }
+            } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
+                    && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+                long oldSize = 0L;
+                long newSize = file.getSize();
+                for (int i = 0; i < files.size(); i++) {
+                    if (files.get(i).getFileName() == file.getFileName() && files.get(i).getSize() != file.getSize()) {
+                        newSize = files.get(i).getSize();
+                        oldSize = file.getSize();
+                        break;
+                    }
+                }
+
+                // Get its information from HDFS name node
+                BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
+                // Create a split per block
+                for (BlockLocation block : fileBlocks) {
+                    if (block.getOffset() + block.getLength() > oldSize) {
+                        if (block.getOffset() < newSize) {
+                            // Block interact with delta -> Create a split
+                            long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
+                            long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
+                                    : block.getOffset() + block.getLength() - newSize;
+                            long splitLength = block.getLength() - startCut - endCut;
+                            fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+                                    block.getHosts()));
+                            orderedExternalFiles.add(file);
+                        }
+                    }
+                }
+            }
+        }
+        fs.close();
+        files.clear();
+        files.addAll(orderedExternalFiles);
+        return fileSplits.toArray(new FileSplit[fileSplits.size()]);
+    }
+
+    public static String getInputFormatClassName(Map<String, String> configuration) {
+        String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+        switch (inputFormatParameter) {
+            case ExternalDataConstants.INPUT_FORMAT_TEXT:
+                return ExternalDataConstants.CLASS_NAME_TEXT_INPUT_FORMAT;
+            case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
+                return ExternalDataConstants.CLASS_NAME_SEQUENCE_INPUT_FORMAT;
+            case ExternalDataConstants.INPUT_FORMAT_RC:
+                return ExternalDataConstants.CLASS_NAME_RC_INPUT_FORMAT;
+            default:
+                return inputFormatParameter;
+        }
+    }
+
+    public static Class<?> getInputFormatClass(Map<String, String> configuration) throws ClassNotFoundException {
+        String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+        switch (inputFormatParameter) {
+            case ExternalDataConstants.INPUT_FORMAT_TEXT:
+                return TextInputFormat.class;
+            case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
+                return SequenceFileInputFormat.class;
+            case ExternalDataConstants.INPUT_FORMAT_RC:
+                return RCFileInputFormat.class;
+            default:
+                return Class.forName(inputFormatParameter);
+        }
+    }
+
+    public static JobConf configureHDFSJobConf(Map<String, String> configuration) throws Exception {
+        JobConf conf = new JobConf();
+
+        String localShortCircuitSocketPath = configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH);
+        String formatClassName = HDFSUtils.getInputFormatClassName(configuration);
+        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
+                configuration.get(ExternalDataConstants.KEY_HDFS_URL).trim());
+        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, ExternalDataConstants.CLASS_NAME_HDFS_FILESYSTEM);
+        conf.setClassLoader(HDFSInputStreamProvider.class.getClassLoader());
+        conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, configuration.get(ExternalDataConstants.KEY_PATH).trim());
+        conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT, formatClassName);
+
+        // Enable local short circuit reads if user supplied the parameters
+        if (localShortCircuitSocketPath != null) {
+            conf.set(ExternalDataConstants.KEY_HADOOP_SHORT_CIRCUIT, "true");
+            conf.set(ExternalDataConstants.KEY_HADOOP_SOCKET_PATH, localShortCircuitSocketPath.trim());
+        }
+        return conf;
+    }
+
+    public static AlgebricksPartitionConstraint getPartitionConstraints(
+            AlgebricksPartitionConstraint clusterLocations) {
+        if (clusterLocations == null) {
+            ArrayList<String> locs = new ArrayList<String>();
+            Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
+            for (String i : stores.keySet()) {
+                int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
+                for (int k = 0; k < numIODevices; k++) {
+                    locs.add(i);
+                }
+            }
+            String[] cluster = new String[locs.size()];
+            cluster = locs.toArray(cluster);
+            clusterLocations = new AlgebricksAbsolutePartitionConstraint(cluster);
+        }
+        return clusterLocations;
+    }
+
+    public static RecordIdType getRecordIdType(Map<String, String> configuration) {
+        String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+        switch (inputFormatParameter) {
+            case ExternalDataConstants.INPUT_FORMAT_TEXT:
+            case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
+                return RecordIdType.OFFSET;
+            case ExternalDataConstants.INPUT_FORMAT_RC:
+                return RecordIdType.RC;
+            default:
+                return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolver.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolver.java
deleted file mode 100644
index 3a92b97..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolver.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-
-/**
- * A policy for resolving a name to a node controller id.
- */
-public interface INodeResolver {
-
-    /**
-     * Resolve a passed-in value to a node controller id.
-     * 
-     * @param value
-     *            string to be resolved
-     * @return resolved result (a node controller id)
-     * @throws AsterixException
-     */
-    public String resolveNode(String value) throws AsterixException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolverFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolverFactory.java
deleted file mode 100644
index b3c459b..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/INodeResolverFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util;
-
-/**
- * Factory for creating an instance of INodeResolver
- *
- * @see INodeResolver
- */
-public interface INodeResolverFactory {
-
-    /**
-     * Create an instance of {@link INodeResolver}
-     * 
-     * @return an instance of INodeResolver
-     */
-    public INodeResolver createNodeResolver();
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentitiyResolverFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentitiyResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentitiyResolverFactory.java
index 776061f..582189a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentitiyResolverFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentitiyResolverFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.util;
 
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.external.api.INodeResolverFactory;
+
 /**
  * Factory for creating an instance of @see {IdentityResolver}.
  * Identity resolver simply resolves a value to itself and is useful when value being resolved

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
index 2b792b2..bda5f1e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.util;
 
+import org.apache.asterix.external.api.INodeResolver;
+
 /**
  * Identity resolver simply resolves a value to itself and is useful when value being resolved
  * is a node controller id.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
new file mode 100644
index 0000000..61764d7
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.om.util.AsterixRuntimeUtil;
+
+/**
+ * Resolves a value (DNS/IP Address) or a (Node Controller Id) to the id of a Node Controller running at the location.
+ */
+public class NodeResolver implements INodeResolver {
+    //TODO: change this call and replace by calling AsterixClusterProperties
+    private static final Random random = new Random();
+    private static final Map<InetAddress, Set<String>> ncMap = new HashMap<InetAddress, Set<String>>();
+    private static final Set<String> ncs = new HashSet<String>();
+
+    @Override
+    public String resolveNode(String value) throws AsterixException {
+        UnknownHostException uhe = null;
+        try {
+            if (ncMap.isEmpty()) {
+                NodeResolver.updateNCs();
+            }
+            InetAddress ipAddress = null;
+            try {
+                ipAddress = InetAddress.getByName(value);
+            } catch (UnknownHostException e) {
+                uhe = e;
+            }
+            if (ipAddress == null) {
+                if (ncs.contains(value)) {
+                    return value;
+                } else {
+                    NodeResolver.updateNCs();
+                    if (ncs.contains(value)) {
+                        return value;
+                    } else {
+                        throw new AsterixException("address passed: '" + value
+                                + "' couldn't be resolved to an ip address and is not an NC id. Existing NCs are "
+                                + ncs.toString(), uhe);
+                    }
+                }
+
+            }
+            Set<String> nodeControllers = ncMap.get(ipAddress);
+            if (nodeControllers == null || nodeControllers.isEmpty()) {
+                throw new AsterixException(" No node controllers found at the address: " + value);
+            }
+            String chosenNCId = nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())];
+            return chosenNCId;
+        } catch (UnknownHostException e) {
+            throw new AsterixException("Unable to resolve hostname '" + value + "' to an IP address");
+        } catch (AsterixException ae) {
+            throw ae;
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+    }
+
+    private static void updateNCs() throws Exception {
+        synchronized (ncMap) {
+            ncMap.clear();
+            AsterixRuntimeUtil.getNodeControllerMap(ncMap);
+            synchronized (ncs) {
+                ncs.clear();
+                for (Entry<InetAddress, Set<String>> entry : ncMap.entrySet()) {
+                    ncs.addAll(entry.getValue());
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/resources/adm.grammar
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/resources/adm.grammar b/asterix-external-data/src/main/resources/adm.grammar
new file mode 100644
index 0000000..1910436
--- /dev/null
+++ b/asterix-external-data/src/main/resources/adm.grammar
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# LEXER GENERATOR configuration file
+# ---------------------------------------
+# Place *first* the generic configuration
+# then list your grammar.
+
+PACKAGE:          org.apache.asterix.runtime.operators.file.adm
+LEXER_NAME:       AdmLexer
+
+TOKENS:
+
+BOOLEAN_CONS   = string(boolean)
+INT8_CONS      = string(int8)
+INT16_CONS     = string(int16)
+INT32_CONS     = string(int32)
+INT64_CONS     = string(int64)
+INT64_CONS     = string(int)
+FLOAT_CONS     = string(float)
+DOUBLE_CONS    = string(double)
+DATE_CONS      = string(date)
+DATETIME_CONS  = string(datetime)
+DURATION_CONS  = string(duration)
+STRING_CONS    = string(string)
+HEX_CONS       = string(hex)
+BASE64_CONS    = string(base64)
+POINT_CONS     = string(point)
+POINT3D_CONS   = string(point3d)
+LINE_CONS      = string(line)
+POLYGON_CONS   = string(polygon)
+RECTANGLE_CONS = string(rectangle)
+CIRCLE_CONS    = string(circle)
+TIME_CONS      = string(time)
+INTERVAL_TIME_CONS      = string(interval-time)
+INTERVAL_DATE_CONS      = string(interval-date)
+INTERVAL_DATETIME_CONS  = string(interval-datetime)
+YEAR_MONTH_DURATION_CONS = string(year-month-duration)
+DAY_TIME_DURATION_CONS   = string(day-time-duration)
+UUID_CONS      = string(uuid)
+
+NULL_LITERAL   = string(null)
+TRUE_LITERAL   = string(true)
+FALSE_LITERAL  = string(false)
+
+CONSTRUCTOR_OPEN     = char(()
+CONSTRUCTOR_CLOSE    = char())
+START_RECORD         = char({)
+END_RECORD           = char(})
+COMMA                = char(\,)
+COLON                = char(:)
+START_ORDERED_LIST   = char([)
+END_ORDERED_LIST     = char(])
+START_UNORDERED_LIST = string({{)
+# END_UNORDERED_LIST  = }} is recognized as a double END_RECORD token
+
+STRING_LITERAL       = char("), anythingUntil(")
+
+INT_LITERAL          = signOrNothing(), digitSequence()
+INT8_LITERAL         = token(INT_LITERAL), string(i8)
+INT16_LITERAL        = token(INT_LITERAL), string(i16)
+INT32_LITERAL        = token(INT_LITERAL), string(i32)
+INT64_LITERAL        = token(INT_LITERAL), string(i64)
+
+@EXPONENT            = caseInsensitiveChar(e), signOrNothing(), digitSequence()
+
+DOUBLE_LITERAL		 = signOrNothing(), char(.), digitSequence()
+DOUBLE_LITERAL		 = signOrNothing(), digitSequence(), char(.), digitSequence()
+DOUBLE_LITERAL		 = signOrNothing(), digitSequence(), char(.), digitSequence(), token(@EXPONENT)
+DOUBLE_LITERAL		 = signOrNothing(), digitSequence(), token(@EXPONENT)
+
+FLOAT_LITERAL		 = token(DOUBLE_LITERAL), caseInsensitiveChar(f)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFactory.java
index 2c1f02a..db693a1 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFactory.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.library;
 
-import org.apache.asterix.external.library.IExternalScalarFunction;
-import org.apache.asterix.external.library.IFunctionFactory;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
 
 public class AddHashTagsFactory implements IFunctionFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
index bca508f..db717e6 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
@@ -23,6 +23,8 @@ import org.apache.asterix.external.library.java.JObjects.JPoint;
 import org.apache.asterix.external.library.java.JObjects.JRecord;
 import org.apache.asterix.external.library.java.JObjects.JString;
 import org.apache.asterix.external.library.java.JObjects.JUnorderedList;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
 import org.apache.asterix.external.library.java.JTypeTag;
 import org.apache.asterix.external.util.Datatypes;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFactory.java
index aec9e5d..a13da84 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFactory.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.library;
 
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
 
 public class AddHashTagsInPlaceFactory implements IFunctionFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFunction.java
index 2765225..399f0f9 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsInPlaceFunction.java
@@ -21,6 +21,8 @@ package org.apache.asterix.external.library;
 import org.apache.asterix.external.library.java.JObjects.JRecord;
 import org.apache.asterix.external.library.java.JObjects.JString;
 import org.apache.asterix.external.library.java.JObjects.JUnorderedList;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
 import org.apache.asterix.external.library.java.JTypeTag;
 import org.apache.asterix.external.util.Datatypes;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFactory.java
index dc0ab7a..9050462 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFactory.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.library;
 
-import org.apache.asterix.external.library.IExternalFunction;
-import org.apache.asterix.external.library.IFunctionFactory;
+import org.apache.asterix.external.api.IExternalFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
 
 public class AllTypesFactory implements IFunctionFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFunction.java
index 12ce871..8f65bee 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AllTypesFunction.java
@@ -35,6 +35,8 @@ import org.apache.asterix.external.library.java.JObjects.JRecord;
 import org.apache.asterix.external.library.java.JObjects.JString;
 import org.apache.asterix.external.library.java.JObjects.JTime;
 import org.apache.asterix.external.library.java.JObjects.JUnorderedList;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
 import org.apache.asterix.external.library.java.JTypeTag;
 
 public class AllTypesFunction implements IExternalScalarFunction {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFactory.java
index 7d1d3da..e15cb3d 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFactory.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.library;
 
-import org.apache.asterix.external.library.IExternalScalarFunction;
-import org.apache.asterix.external.library.IFunctionFactory;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
 
 public class CapitalFinderFactory implements IFunctionFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFunction.java
index 21467af..969e109 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/CapitalFinderFunction.java
@@ -23,6 +23,8 @@ import java.util.Properties;
 
 import org.apache.asterix.external.library.java.JObjects.JRecord;
 import org.apache.asterix.external.library.java.JObjects.JString;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
 import org.apache.asterix.external.library.java.JTypeTag;
 
 public class CapitalFinderFunction implements IExternalScalarFunction {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFactory.java
index 21ad776..5d8126b 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFactory.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.library;
 
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
 
 public class EchoDelayFactory implements IFunctionFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFunction.java
index e564ca0..c115ac4 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/EchoDelayFunction.java
@@ -20,6 +20,8 @@ package org.apache.asterix.external.library;
 
 import java.util.Random;
 
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
 import org.apache.asterix.external.library.java.JObjects.JRecord;
 
 public class EchoDelayFunction implements IExternalScalarFunction {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFactory.java
index db3a5fa..5515ebd 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFactory.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.library;
 
-import org.apache.asterix.external.library.IExternalScalarFunction;
-import org.apache.asterix.external.library.IFunctionFactory;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
 
 public class ParseTweetFactory implements IFunctionFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFunction.java
index caa0544..b9c736a 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ParseTweetFunction.java
@@ -21,6 +21,8 @@ package org.apache.asterix.external.library;
 import org.apache.asterix.external.library.java.JObjects.JRecord;
 import org.apache.asterix.external.library.java.JObjects.JString;
 import org.apache.asterix.external.library.java.JObjects.JUnorderedList;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
 import org.apache.asterix.external.library.java.JTypeTag;
 
 public class ParseTweetFunction implements IExternalScalarFunction {