You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:15 UTC

[17/21] incubator-asterixdb git commit: First stage of external data cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
deleted file mode 100644
index 64c62f7..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * Factory class for creating an instance of NCFileSystemAdapter. An
- * NCFileSystemAdapter reads external data residing on the local file system of
- * an NC.
- */
-public class NCFileSystemAdapter extends FileSystemBasedAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FileSplit[] fileSplits;
-
-    public NCFileSystemAdapter(FileSplit[] fileSplits, ITupleParserFactory parserFactory, IAType atype,
-            IHyracksTaskContext ctx) throws HyracksDataException {
-        super(parserFactory, atype, ctx);
-        this.fileSplits = fileSplits;
-    }
-
-    @Override
-    public InputStream getInputStream(int partition) throws IOException {
-        FileSplit split = fileSplits[partition];
-        File inputFile = split.getLocalFile().getFile();
-        InputStream in;
-        try {
-            in = new FileInputStream(inputFile);
-            return in;
-        } catch (FileNotFoundException e) {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    public String getFilename(int partition) {
-        final FileSplit fileSplit = fileSplits[partition];
-        return fileSplit.getNodeName() + ":" + fileSplit.getLocalFile().getFile().getPath();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java
deleted file mode 100644
index d6b4ba7..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-import org.apache.asterix.external.feeds.IPullBasedFeedAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * Acts as an abstract class for all pull-based external data adapters. Captures
- * the common logic for obtaining bytes from an external source and packing them
- * into frames as tuples.
- */
-public abstract class PullBasedAdapter implements IPullBasedFeedAdapter {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(PullBasedAdapter.class.getName());
-    private static final int timeout = 5; // seconds
-
-    protected ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
-    protected IPullBasedFeedClient pullBasedFeedClient;
-    protected ARecordType adapterOutputType;
-    protected boolean continueIngestion = true;
-    protected Map<String, String> configuration;
-
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private long tupleCount = 0;
-    private final IHyracksTaskContext ctx;
-    private int frameTupleCount = 0;
-
-    protected FeedPolicyEnforcer policyEnforcer;
-
-    public FeedPolicyEnforcer getPolicyEnforcer() {
-        return policyEnforcer;
-    }
-
-    public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
-        this.policyEnforcer = policyEnforcer;
-    }
-
-    public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
-
-    public PullBasedAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) {
-        this.ctx = ctx;
-        this.configuration = configuration;
-    }
-
-    public long getIngestedRecordsCount() {
-        return tupleCount;
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        frame = new VSizeFrame(ctx);
-        appender = new FrameTupleAppender(frame);
-
-        pullBasedFeedClient = getFeedClient(partition);
-        InflowState inflowState = null;
-
-        while (continueIngestion) {
-            tupleBuilder.reset();
-            try {
-                // blocking call
-                inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput(), timeout);
-                switch (inflowState) {
-                    case DATA_AVAILABLE:
-                        tupleBuilder.addFieldEndOffset();
-                        appendTupleToFrame(writer);
-                        frameTupleCount++;
-                        break;
-                    case NO_MORE_DATA:
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Reached end of feed");
-                        }
-                        appender.flush(writer, true);
-                        tupleCount += frameTupleCount;
-                        frameTupleCount = 0;
-                        continueIngestion = false;
-                        break;
-                    case DATA_NOT_AVAILABLE:
-                        if (frameTupleCount > 0) {
-                            appender.flush(writer, true);
-                            tupleCount += frameTupleCount;
-                            frameTupleCount = 0;
-                        }
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Timed out on obtaining data from pull based adapter. Trying again!");
-                        }
-                        break;
-                }
-
-            } catch (Exception failureException) {
-                try {
-                    failureException.printStackTrace();
-                    boolean continueIngestion = policyEnforcer.continueIngestionPostSoftwareFailure(failureException);
-                    if (continueIngestion) {
-                        tupleBuilder.reset();
-                        continue;
-                    } else {
-                        throw failureException;
-                    }
-                } catch (Exception recoveryException) {
-                    throw new Exception(recoveryException);
-                }
-            }
-        }
-    }
-
-    private void appendTupleToFrame(IFrameWriter writer) throws HyracksDataException {
-        if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                tupleBuilder.getSize())) {
-            appender.flush(writer, true);
-            if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                    tupleBuilder.getSize())) {
-                throw new IllegalStateException();
-            }
-        }
-    }
-
-    /**
-     * Discontinue the ingestion of data and end the feed.
-     * 
-     * @throws Exception
-     */
-    public void stop() throws Exception {
-        continueIngestion = false;
-    }
-
-    public Map<String, String> getConfiguration() {
-        return configuration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
deleted file mode 100644
index 985399b..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
-import com.microsoft.windowsazure.services.table.client.CloudTableClient;
-import com.microsoft.windowsazure.services.table.client.TableConstants;
-import com.microsoft.windowsazure.services.table.client.TableQuery;
-import com.microsoft.windowsazure.services.table.client.TableQuery.Operators;
-import com.microsoft.windowsazure.services.table.client.TableQuery.QueryComparisons;
-import com.microsoft.windowsazure.services.table.client.TableServiceEntity;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.library.java.JObjects.ByteArrayAccessibleInputStream;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.util.ResettableByteArrayOutputStream;
-import org.apache.asterix.runtime.operators.file.ADMDataParser;
-
-public class PullBasedAzureFeedClient implements IPullBasedFeedClient {
-    private static final Logger LOGGER = Logger.getLogger(PullBasedAzureFeedClient.class.getName());
-
-    private final String tableName;
-    private final ARecordType outputType;
-    private final CloudTableClient ctc;
-    private final TableQuery<? extends TableServiceEntity> tableQuery;
-    private Iterator<? extends TableServiceEntity> entityIt;
-
-    private final Pattern arrayPattern = Pattern.compile("\\[(?<vals>.*)\\]");
-    private final Pattern int32Pattern = Pattern.compile(":(?<int>\\d+)(,|})");
-    private final Pattern doubleWithEndingZeroPattern = Pattern.compile("\\d+\\.(?<zero>0)(,|})");
-
-    private final ResettableByteArrayOutputStream rbaos;
-    private final DataOutputStream dos;
-    private final ADMDataParser adp;
-    private final ByteArrayAccessibleInputStream baais;
-
-    public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType, String tableName, String lowKey,
-            String highKey) throws AsterixException {
-        this.tableName = tableName;
-        this.outputType = outputType;
-        this.tableQuery = configureTableQuery(tableName, lowKey, highKey);
-        this.ctc = csa.createCloudTableClient();
-        rbaos = new ResettableByteArrayOutputStream();
-        dos = new DataOutputStream(rbaos);
-        baais = new ByteArrayAccessibleInputStream(rbaos.getByteArray(), 0, 0);
-        adp = new ADMDataParser();
-        adp.initialize(baais, outputType, false);
-    }
-
-    private TableQuery<? extends TableServiceEntity> configureTableQuery(String tableName, String lowKey, String highKey) {
-        TableQuery<? extends TableServiceEntity> baseTQ = TableQuery.from(tableName, classFromString(tableName));
-        if (lowKey != null && highKey != null) {
-            String lowKeyPredicate = TableQuery.generateFilterCondition(TableConstants.PARTITION_KEY,
-                    QueryComparisons.GREATER_THAN_OR_EQUAL, lowKey);
-            String highKeyPredicate = TableQuery.generateFilterCondition(TableConstants.PARTITION_KEY,
-                    QueryComparisons.LESS_THAN_OR_EQUAL, highKey);
-            String partitionPredicate = TableQuery.combineFilters(lowKeyPredicate, Operators.AND, highKeyPredicate);
-            return baseTQ.where(partitionPredicate);
-        }
-
-        return baseTQ;
-    }
-
-    private Class<? extends TableServiceEntity> classFromString(String tableName) {
-        return tableName.equals("Postings") ? AzureTweetEntity.class : AzureTweetMetadataEntity.class;
-    }
-
-    @Override
-    public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException {
-        if (entityIt == null) {
-            entityIt = ctc.execute(tableQuery).iterator();
-        }
-
-        boolean moreTweets = entityIt.hasNext();
-        if (moreTweets) {
-            String json = null;
-            try {
-                json = getJSONString();
-                byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
-                rbaos.reset();
-                dos.write(jsonBytes, 0, jsonBytes.length);
-                dos.flush();
-                baais.setContent(rbaos.getByteArray(), 0, jsonBytes.length);
-                adp.initialize(baais, outputType, false);
-                adp.parse(dataOutput);
-            } catch (Exception e) {
-                if (json != null) {
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Record in error: " + json);
-                    }
-                }
-                e.printStackTrace();
-                throw new AsterixException(e);
-            }
-        }
-        return moreTweets ? InflowState.DATA_AVAILABLE : InflowState.NO_MORE_DATA;
-    }
-
-    private String getJSONString() throws JSONException {
-        if (tableName.equals("Postings")) {
-            AzureTweetEntity tweet = (AzureTweetEntity) entityIt.next();
-            JSONObject tjo = new JSONObject(tweet.getJSON().toString());
-            tjo.put("posting_id", tweet.getRowKey());
-            tjo.put("user_id", tweet.getPartitionKey());
-            tjo.remove("id");
-            JSONObject utjo = tjo.getJSONObject("user");
-            utjo.remove("id");
-            tjo.put("user", utjo);
-            return tjo.toString();
-        } else if (tableName.equals("PostingMetadata")) {
-            AzureTweetMetadataEntity tweetMD = (AzureTweetMetadataEntity) entityIt.next();
-            JSONObject tmdjo = new JSONObject();
-            tmdjo.put("posting_id", tweetMD.getRowKey());
-            tmdjo.put("user_id", tweetMD.getPartitionKey());
-            tmdjo.put("created_at", stripTillColon(tweetMD.getCreationTimestamp()).replaceAll("\"", ""));
-            tmdjo.put("posting_type", stripTillColon(tweetMD.getPostingType()));
-            List<Integer> productIdList = Arrays.asList(extractArray(tweetMD.getProductId()));
-            tmdjo.put("product_id", productIdList);
-            if (tweetMD.getEthnicity() != null) {
-                tmdjo.put("ethnicity", new JSONObject(stripTillColon(tweetMD.getEthnicity())));
-            }
-            if (tweetMD.getGender() != null) {
-                tmdjo.put("gender", new JSONObject(stripTillColon(tweetMD.getGender())));
-            }
-            if (tweetMD.getLocation() != null) {
-                String locStr = stripTillColon(tweetMD.getLocation());
-                Matcher m = int32Pattern.matcher(locStr);
-                while (m.find()) {
-                    locStr = locStr.replace(m.group("int"), m.group("int") + ".01");
-                }
-                m = doubleWithEndingZeroPattern.matcher(locStr);
-                while (m.find()) {
-                    locStr = locStr.replace(m.group("zero"), "01");
-                }
-                tmdjo.put("location", new JSONObject(locStr));
-            }
-            if (tweetMD.getSentiment() != null) {
-                tmdjo.put("sentiment", stripTillColon(tweetMD.getSentiment()));
-            }
-            return tmdjo.toString();
-        } else {
-            throw new IllegalArgumentException();
-        }
-    }
-
-    private String stripTillColon(String str) {
-        return str.substring(str.indexOf(':') + 1);
-    }
-
-    private Integer[] extractArray(String str) {
-        Matcher m = arrayPattern.matcher(str);
-        m.find();
-        String[] stringNums = m.group("vals").replaceAll("\\s", "").split(",");
-        Integer[] nums = new Integer[stringNums.length];
-        for (int i = 0; i < nums.length; ++i) {
-            nums[i] = Integer.parseInt(stringNums[i]);
-        }
-        return nums;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
deleted file mode 100644
index e8cacde..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class PullBasedAzureTwitterAdapter extends PullBasedAdapter implements IDatasourceAdapter {
-    private static final Logger LOGGER = Logger.getLogger(PullBasedAzureTwitterAdapter.class.getName());
-
-    private static final long serialVersionUID = 1L;
-
-    private final CloudStorageAccount csa;
-    private final String connectionString;
-    private final String azureAccountName;
-    private final String azureAccountKey;
-    private final ARecordType outputType;
-    private final String tableName;
-    private final boolean partitioned;
-
-    private String[] lowKeys;
-    private String[] highKeys;
-
-    public PullBasedAzureTwitterAdapter(String accountName, String accountKey, String tableName, String[] partitions,
-            Map<String, String> configuration, IHyracksTaskContext ctx, ARecordType outputType) throws AsterixException {
-        super(configuration, ctx);
-        this.outputType = outputType;
-        if (partitions != null) {
-            partitioned = true;
-            configurePartitions(partitions);
-        } else {
-            partitioned = false;
-        }
-        this.azureAccountName = accountName;
-        this.azureAccountKey = accountKey;
-        this.tableName = tableName;
-
-        connectionString = "DefaultEndpointsProtocol=http;" + "AccountName=" + azureAccountName + ";AccountKey="
-                + azureAccountKey + ";";
-        try {
-            csa = CloudStorageAccount.parse(connectionString);
-        } catch (InvalidKeyException | URISyntaxException e) {
-            throw new AsterixException("You must specify a valid Azure account name and key", e);
-        }
-    }
-
-    private void configurePartitions(String[] partitions) {
-        lowKeys = new String[partitions.length];
-        highKeys = new String[partitions.length];
-        for (int i = 0; i < partitions.length; ++i) {
-            String[] loHi = partitions[i].split(":");
-            lowKeys[i] = loHi[0];
-            highKeys[i] = loHi[1];
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Partition " + i + " configured for keys " + lowKeys[i] + " to " + highKeys[i]);
-            }
-        }
-    }
-
-    @Override
-    public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
-        if (partitioned) {
-            return new PullBasedAzureFeedClient(csa, outputType, tableName, lowKeys[partition], highKeys[partition]);
-        }
-        return new PullBasedAzureFeedClient(csa, outputType, tableName, null, null);
-    }
-
-    @Override
-    public DataExchangeMode getDataExchangeMode() {
-        return DataExchangeMode.PULL;
-    }
-
-    @Override
-    public boolean handleException(Exception e) {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
deleted file mode 100644
index 90281b7..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * An adapter that provides the functionality of receiving tweets from the
- * Twitter service in the form of ADM formatted records.
- */
-public class PullBasedTwitterAdapter extends ClientBasedFeedAdapter implements IFeedAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final int DEFAULT_BATCH_SIZE = 5;
-
-    private ARecordType recordType;
-    private PullBasedTwitterFeedClient tweetClient;
-
-    @Override
-    public IFeedClient getFeedClient(int partition) {
-        return tweetClient;
-    }
-
-    public PullBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
-            throws AsterixException {
-        super(configuration, ctx);
-        tweetClient = new PullBasedTwitterFeedClient(ctx, recordType, this);
-    }
-
-    public ARecordType getAdapterOutputType() {
-        return recordType;
-    }
-
-    @Override
-    public DataExchangeMode getDataExchangeMode() {
-        return DataExchangeMode.PULL;
-    }
-
-    @Override
-    public boolean handleException(Exception e) {
-        return true;
-    }
-
-    @Override
-    public ITupleForwardPolicy getTupleParserPolicy() {
-        configuration.put(ITupleForwardPolicy.PARSER_POLICY,
-                ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
-        String propValue = configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
-        if (propValue == null) {
-            configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" + DEFAULT_BATCH_SIZE);
-        }
-        return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
deleted file mode 100644
index 8b5e1e1..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.List;
-import java.util.Map;
-
-import twitter4j.Query;
-import twitter4j.QueryResult;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.external.util.TweetProcessor;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * An implementation of @see {PullBasedFeedClient} for the Twitter service. The
- * feed client fetches data from Twitter service by sending request at regular
- * (configurable) interval.
- */
-public class PullBasedTwitterFeedClient extends FeedClient {
-
-    private String keywords;
-    private Query query;
-    private Twitter twitter;
-    private int requestInterval = 5; // seconds
-    private QueryResult result;
-
-    private ARecordType recordType;
-    private int nextTweetIndex = 0;
-    private long lastTweetIdReceived = 0;
-    private TweetProcessor tweetProcessor;
-
-    public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PullBasedTwitterAdapter adapter) {
-        this.twitter = TwitterUtil.getTwitterService(adapter.getConfiguration());
-        this.recordType = recordType;
-        this.tweetProcessor = new TweetProcessor(recordType);
-        this.recordSerDe = new ARecordSerializerDeserializer(recordType);
-        this.mutableRecord = tweetProcessor.getMutableRecord();
-        this.initialize(adapter.getConfiguration());
-    }
-
-    public ARecordType getRecordType() {
-        return recordType;
-    }
-
-    @Override
-    public InflowState retrieveNextRecord() throws Exception {
-        Status tweet;
-        tweet = getNextTweet();
-        if (tweet == null) {
-            return InflowState.DATA_NOT_AVAILABLE;
-        }
-
-        tweetProcessor.processNextTweet(tweet);
-        return InflowState.DATA_AVAILABLE;
-    }
-
-    private void initialize(Map<String, String> params) {
-        this.keywords = (String) params.get(SearchAPIConstants.QUERY);
-        this.requestInterval = Integer.parseInt((String) params.get(SearchAPIConstants.INTERVAL));
-        this.query = new Query(keywords);
-        this.query.setCount(100);
-    }
-
-    private Status getNextTweet() throws TwitterException, InterruptedException {
-        if (result == null || nextTweetIndex >= result.getTweets().size()) {
-            Thread.sleep(1000 * requestInterval);
-            query.setSinceId(lastTweetIdReceived);
-            result = twitter.search(query);
-            nextTweetIndex = 0;
-        }
-        if (result != null && !result.getTweets().isEmpty()) {
-            List<Status> tw = result.getTweets();
-            Status tweet = tw.get(nextTweetIndex++);
-            if (lastTweetIdReceived < tweet.getId()) {
-                lastTweetIdReceived = tweet.getId();
-            }
-            return tweet;
-        } else {
-            return null;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
deleted file mode 100644
index 01839d3..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class PushBasedTwitterAdapter extends ClientBasedFeedAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final int DEFAULT_BATCH_SIZE = 50;
-
-    private PushBasedTwitterFeedClient tweetClient;
-
-    public PushBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx) throws AsterixException {
-        super(configuration, ctx);
-        this.configuration = configuration;
-        this.tweetClient = new PushBasedTwitterFeedClient(ctx, recordType, this);
-    }
-
-    @Override
-    public DataExchangeMode getDataExchangeMode() {
-        return DataExchangeMode.PUSH;
-    }
-
-    @Override
-    public boolean handleException(Exception e) {
-        return true;
-    }
-
-    @Override
-    public IFeedClient getFeedClient(int partition) throws Exception {
-        return tweetClient;
-    }
-
-    @Override
-    public ITupleForwardPolicy getTupleParserPolicy() {
-        configuration.put(ITupleForwardPolicy.PARSER_POLICY,
-                ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
-        String propValue = configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
-        if (propValue == null) {
-            configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" + DEFAULT_BATCH_SIZE);
-        }
-        return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
deleted file mode 100644
index bb40ac9..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.external.util.TweetProcessor;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterStream;
-
-/**
- * An implementation of @see {PullBasedFeedClient} for the Twitter service. The
- * feed client fetches data from Twitter service by sending request at regular
- * (configurable) interval.
- */
-public class PushBasedTwitterFeedClient extends FeedClient {
-
-    private ARecordType recordType;
-    private TweetProcessor tweetProcessor;
-    private LinkedBlockingQueue<Status> inputQ;
-
-    public PushBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PushBasedTwitterAdapter adapter)
-            throws AsterixException {
-        this.recordType = recordType;
-        this.tweetProcessor = new TweetProcessor(recordType);
-        this.recordSerDe = new ARecordSerializerDeserializer(recordType);
-        this.mutableRecord = tweetProcessor.getMutableRecord();
-        this.inputQ = new LinkedBlockingQueue<Status>();
-        TwitterStream twitterStream = TwitterUtil.getTwitterStream(adapter.getConfiguration());
-        twitterStream.addListener(new TweetListener(inputQ));
-        FilterQuery query = TwitterUtil.getFilterQuery(adapter.getConfiguration());
-        if (query != null) {
-            twitterStream.filter(query);
-        } else {
-            twitterStream.sample();
-        }
-    }
-
-    public ARecordType getRecordType() {
-        return recordType;
-    }
-
-    private class TweetListener implements StatusListener {
-
-        private LinkedBlockingQueue<Status> inputQ;
-
-        public TweetListener(LinkedBlockingQueue<Status> inputQ) {
-            this.inputQ = inputQ;
-        }
-
-        @Override
-        public void onStatus(Status tweet) {
-            inputQ.add(tweet);
-        }
-
-        @Override
-        public void onException(Exception arg0) {
-
-        }
-
-        @Override
-        public void onDeletionNotice(StatusDeletionNotice arg0) {
-        }
-
-        @Override
-        public void onScrubGeo(long arg0, long arg1) {
-        }
-
-        @Override
-        public void onStallWarning(StallWarning arg0) {
-        }
-
-        @Override
-        public void onTrackLimitationNotice(int arg0) {
-        }
-    }
-
-    @Override
-    public InflowState retrieveNextRecord() throws Exception {
-        Status tweet = inputQ.take();
-        tweetProcessor.processNextTweet(tweet);
-        return InflowState.DATA_AVAILABLE;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedAdapter.java
deleted file mode 100644
index 69cd82c..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
- */
-public class RSSFeedAdapter extends ClientBasedFeedAdapter implements IFeedAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final String KEY_RSS_URL = "url";
-
-    private List<String> feedURLs = new ArrayList<String>();
-    private String id_prefix = "";
-
-    private IFeedClient rssFeedClient;
-
-    private ARecordType recordType;
-
-    public RSSFeedAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
-            throws AsterixException {
-        super(configuration, ctx);
-        id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
-        this.recordType = recordType;
-        reconfigure(configuration);
-    }
-
-    private void initializeFeedURLs(String rssURLProperty) {
-        feedURLs.clear();
-        String[] feedURLProperty = rssURLProperty.split(",");
-        for (String feedURL : feedURLProperty) {
-            feedURLs.add(feedURL);
-        }
-    }
-
-    protected void reconfigure(Map<String, String> arguments) {
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
-        if (rssURLProperty != null) {
-            initializeFeedURLs(rssURLProperty);
-        }
-    }
-
-    @Override
-    public IFeedClient getFeedClient(int partition) throws Exception {
-        if (rssFeedClient == null) {
-            rssFeedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
-        }
-        return rssFeedClient;
-    }
-
-    public ARecordType getRecordType() {
-        return recordType;
-    }
-
-    @Override
-    public DataExchangeMode getDataExchangeMode() {
-        return DataExchangeMode.PULL;
-    }
-
-    @Override
-    public boolean handleException(Exception e) {
-        return false;
-    }
-
-    @Override
-    public ITupleForwardPolicy getTupleParserPolicy() {
-        return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedClient.java
deleted file mode 100644
index 0b0d0fb..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedClient.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-import com.sun.syndication.feed.synd.SyndEntryImpl;
-import com.sun.syndication.feed.synd.SyndFeed;
-import com.sun.syndication.fetcher.FeedFetcher;
-import com.sun.syndication.fetcher.FetcherEvent;
-import com.sun.syndication.fetcher.FetcherListener;
-import com.sun.syndication.fetcher.impl.FeedFetcherCache;
-import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
-import com.sun.syndication.fetcher.impl.HttpURLFeedFetcher;
-
-import org.apache.asterix.om.base.AMutableRecord;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.ARecordType;
-
-/**
- * An implementation of @see {PullBasedFeedClient} responsible for
- * fetching from an RSS feed source at regular interval.
- */
-@SuppressWarnings("rawtypes")
-public class RSSFeedClient extends FeedClient {
-
-    private long id = 0;
-    private String idPrefix;
-    private boolean feedModified = false;
-
-    private Queue<SyndEntryImpl> rssFeedBuffer = new LinkedList<SyndEntryImpl>();
-
-    IAObject[] mutableFields;
-
-    private final FeedFetcherCache feedInfoCache;
-    private final FeedFetcher fetcher;
-    private final FetcherEventListenerImpl listener;
-    private final URL feedUrl;
-    private ARecordType recordType;
-    String[] tupleFieldValues;
-
-    public boolean isFeedModified() {
-        return feedModified;
-    }
-
-    public void setFeedModified(boolean feedModified) {
-        this.feedModified = feedModified;
-    }
-
-    public RSSFeedClient(RSSFeedAdapter adapter, String feedURL, String id_prefix) throws MalformedURLException {
-        this.idPrefix = id_prefix;
-        this.feedUrl = new URL(feedURL);
-        feedInfoCache = HashMapFeedInfoCache.getInstance();
-        fetcher = new HttpURLFeedFetcher(feedInfoCache);
-        listener = new FetcherEventListenerImpl(this);
-        fetcher.addFetcherEventListener(listener);
-        mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
-                new AMutableString(null) };
-        recordType = adapter.getRecordType();
-        mutableRecord = new AMutableRecord(recordType, mutableFields);
-        tupleFieldValues = new String[recordType.getFieldNames().length];
-    }
-
-    @Override
-    public InflowState retrieveNextRecord() throws Exception {
-        SyndEntryImpl feedEntry = getNextRSSFeed();
-        if (feedEntry == null) {
-            return InflowState.DATA_NOT_AVAILABLE;
-        }
-        tupleFieldValues[0] = idPrefix + ":" + id;
-        tupleFieldValues[1] = feedEntry.getTitle();
-        tupleFieldValues[2] = feedEntry.getDescription().getValue();
-        tupleFieldValues[3] = feedEntry.getLink();
-        int numFields = recordType.getFieldNames().length;
-        for (int i = 0; i < numFields; i++) {
-            ((AMutableString) mutableFields[i]).setValue(tupleFieldValues[i]);
-            mutableRecord.setValueAtPos(i, mutableFields[i]);
-        }
-        id++;
-        return InflowState.DATA_AVAILABLE;
-    }
-
-    private SyndEntryImpl getNextRSSFeed() throws Exception {
-        if (rssFeedBuffer.isEmpty()) {
-            fetchFeed();
-        }
-        if (rssFeedBuffer.isEmpty()) {
-            return null;
-        } else {
-            return rssFeedBuffer.remove();
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private void fetchFeed() {
-        try {
-            // Retrieve the feed.
-            // We will get a Feed Polled Event and then a
-            // Feed Retrieved event (assuming the feed is valid)
-            SyndFeed feed = fetcher.retrieveFeed(feedUrl);
-            if (feedModified) {
-                System.err.println(feedUrl + " retrieved");
-                System.err.println(feedUrl + " has a title: " + feed.getTitle() + " and contains "
-                        + feed.getEntries().size() + " entries.");
-
-                List fetchedFeeds = feed.getEntries();
-                rssFeedBuffer.addAll(fetchedFeeds);
-            }
-        } catch (Exception ex) {
-            System.out.println("ERROR: " + ex.getMessage());
-            ex.printStackTrace();
-        }
-    }
-
-}
-
-class FetcherEventListenerImpl implements FetcherListener {
-
-    private final IFeedClient feedClient;
-
-    public FetcherEventListenerImpl(IFeedClient feedClient) {
-        this.feedClient = feedClient;
-    }
-
-    /**
-     * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
-     */
-    public void fetcherEvent(FetcherEvent event) {
-        String eventType = event.getEventType();
-        if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
-            System.err.println("\tEVENT: Feed Polled. URL = " + event.getUrlString());
-        } else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
-            System.err.println("\tEVENT: Feed Retrieved. URL = " + event.getUrlString());
-            ((RSSFeedClient) feedClient).setFeedModified(true);
-        } else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
-            System.err.println("\tEVENT: Feed Unchanged. URL = " + event.getUrlString());
-            ((RSSFeedClient) feedClient).setFeedModified(true);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java
index b436177..3f10dc4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -23,7 +23,7 @@ import java.io.InputStream;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -31,7 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.std.file.ITupleParser;
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public abstract class StreamBasedAdapter implements IDatasourceAdapter {
+public abstract class StreamBasedAdapter implements IDataSourceAdapter {
 
     private static final long serialVersionUID = 1L;
 
@@ -43,8 +43,8 @@ public abstract class StreamBasedAdapter implements IDatasourceAdapter {
 
     protected final IAType sourceDatatype;
 
-    public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
+    public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx,
+            int partition) throws HyracksDataException {
         this.tupleParser = parserFactory.createTupleParser(ctx);
         this.sourceDatatype = sourceDatatype;
     }
@@ -56,7 +56,8 @@ public abstract class StreamBasedAdapter implements IDatasourceAdapter {
             tupleParser.parse(in, writer);
         } else {
             if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Could not obtain input stream for parsing from adapter " + this + "[" + partition + "]");
+                LOGGER.warning(
+                        "Could not obtain input stream for parsing from adapter " + this + "[" + partition + "]");
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IPullBasedFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IPullBasedFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IPullBasedFeedAdapter.java
deleted file mode 100644
index 62052af..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IPullBasedFeedAdapter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-
-public interface IPullBasedFeedAdapter extends IFeedAdapter {
-
-    /**
-     * @return
-     */
-    public FeedPolicyEnforcer getPolicyEnforcer();
-
-    /**
-     * @param feedPolicyEnforcer
-     */
-    public void setFeedPolicyEnforcer(FeedPolicyEnforcer feedPolicyEnforcer);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
index 3988f1a..533d119 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
@@ -40,6 +40,16 @@ public class ExternalFile implements Serializable, Comparable<ExternalFile> {
     private int fileNumber;
     private ExternalFilePendingOp pendingOp;
 
+    public ExternalFile() {
+        this.dataverseName = "";
+        this.datasetName = "";
+        this.fileNumber = -1;
+        this.fileName = "";
+        this.lastModefiedTime = new Date();
+        this.size = 0;
+        this.pendingOp = ExternalFilePendingOp.PENDING_NO_OP;
+    }
+
     public ExternalFile(String dataverseName, String datasetName, int fileNumber, String fileName,
             Date lastModefiedTime, long size, ExternalFilePendingOp pendingOp) {
         this.dataverseName = dataverseName;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
index b10379b..d94db08 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
@@ -24,7 +24,7 @@ import java.io.DataInputStream;
 import java.io.Serializable;
 import java.util.Date;
 
-import org.apache.asterix.external.indexing.operators.ExternalLoopkupOperatorDiscriptor;
+import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.asterix.om.base.AInt64;
@@ -57,7 +57,7 @@ public class ExternalFileIndexAccessor implements Serializable {
     private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
     private static final long serialVersionUID = 1L;
     private ExternalBTreeDataflowHelper indexDataflowHelper;
-    private ExternalLoopkupOperatorDiscriptor opDesc;
+    private ExternalLookupOperatorDescriptor opDesc;
 
     private IHyracksTaskContext ctx;
     private ExternalBTree index;
@@ -72,39 +72,34 @@ public class ExternalFileIndexAccessor implements Serializable {
     private IIndexCursor fileIndexSearchCursor;
 
     public ExternalFileIndexAccessor(ExternalBTreeDataflowHelper indexDataflowHelper,
-            ExternalLoopkupOperatorDiscriptor opDesc) {
+            ExternalLookupOperatorDescriptor opDesc) {
         this.indexDataflowHelper = indexDataflowHelper;
         this.opDesc = opDesc;
     }
 
-    public void openIndex() throws HyracksDataException {
+    public void open() throws HyracksDataException {
         // Open the index and get the instance
         indexDataflowHelper.open();
         index = (ExternalBTree) indexDataflowHelper.getIndexInstance();
-        try {
-            // Create search key and search predicate objects
-            searchKey = new ArrayTupleReference();
-            searchKeyTupleBuilder = new ArrayTupleBuilder(FilesIndexDescription.FILE_KEY_SIZE);
-            searchKeyTupleBuilder.reset();
-            searchKeyTupleBuilder.addField(intSerde, currentFileNumber);
-            searchKey.reset(searchKeyTupleBuilder.getFieldEndOffsets(), searchKeyTupleBuilder.getByteArray());
-            searchCmp = BTreeUtils.getSearchMultiComparator(index.getComparatorFactories(), searchKey);
-            searchPredicate = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp);
+        // Create search key and search predicate objects
+        searchKey = new ArrayTupleReference();
+        searchKeyTupleBuilder = new ArrayTupleBuilder(FilesIndexDescription.FILE_KEY_SIZE);
+        searchKeyTupleBuilder.reset();
+        searchKeyTupleBuilder.addField(intSerde, currentFileNumber);
+        searchKey.reset(searchKeyTupleBuilder.getFieldEndOffsets(), searchKeyTupleBuilder.getByteArray());
+        searchCmp = BTreeUtils.getSearchMultiComparator(index.getComparatorFactories(), searchKey);
+        searchPredicate = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp);
 
-            // create the accessor  and the cursor using the passed version
-            ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
-                    .createSearchOperationCallback(indexDataflowHelper.getResourceID(), ctx);
-            fileIndexAccessor = index.createAccessor(searchCallback, indexDataflowHelper.getVersion());
-            fileIndexSearchCursor = fileIndexAccessor.createSearchCursor(false);
-        } catch (Exception e) {
-            indexDataflowHelper.close();
-            throw new HyracksDataException(e);
-        }
+        // create the accessor  and the cursor using the passed version
+        ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
+                .createSearchOperationCallback(indexDataflowHelper.getResourceID(), ctx);
+        fileIndexAccessor = index.createAccessor(searchCallback, indexDataflowHelper.getVersion());
+        fileIndexSearchCursor = fileIndexAccessor.createSearchCursor(false);
     }
 
-    public void searchForFile(int fileNumber, ExternalFile file) throws Exception {
+    public void lookup(int fileId, ExternalFile file) throws Exception {
         // Set search parameters
-        currentFileNumber.setValue(fileNumber);
+        currentFileNumber.setValue(fileId);
         searchKeyTupleBuilder.reset();
         searchKeyTupleBuilder.addField(intSerde, currentFileNumber);
         searchKey.reset(searchKeyTupleBuilder.getFieldEndOffsets(), searchKeyTupleBuilder.getByteArray());
@@ -122,14 +117,14 @@ public class ExternalFileIndexAccessor implements Serializable {
             ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
             DataInput in = new DataInputStream(stream);
             ARecord externalFileRecord = (ARecord) filesIndexDescription.EXTERNAL_FILE_RECORD_SERDE.deserialize(in);
-            setExternalFileFromARecord(externalFileRecord, file);
+            setFile(externalFileRecord, file);
         } else {
             // This should never happen
             throw new HyracksDataException("Was not able to find a file in the files index");
         }
     }
 
-    private void setExternalFileFromARecord(ARecord externalFileRecord, ExternalFile file) {
+    private void setFile(ARecord externalFileRecord, ExternalFile file) {
         file.setFileName(
                 ((AString) externalFileRecord.getValueByPos(FilesIndexDescription.EXTERNAL_FILE_NAME_FIELD_INDEX))
                         .getStringValue());
@@ -140,11 +135,13 @@ public class ExternalFileIndexAccessor implements Serializable {
                         .getChrononTime())));
     }
 
-    public void closeIndex() throws HyracksDataException {
-        try {
-            fileIndexSearchCursor.close();
-        } finally {
-            indexDataflowHelper.close();
+    public void close() throws HyracksDataException {
+        if (index != null) {
+            try {
+                fileIndexSearchCursor.close();
+            } finally {
+                indexDataflowHelper.close();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java
new file mode 100644
index 0000000..fa22179
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.io.IOException;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+@SuppressWarnings("unchecked")
+public class FileIndexTupleTranslator {
+    private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
+    private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
+            filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFieldCount());
+    private RecordBuilder recordBuilder = new RecordBuilder();
+    private ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableString aString = new AMutableString(null);
+    private AMutableDateTime aDateTime = new AMutableDateTime(0);
+    private ISerializerDeserializer<IAObject> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ASTRING);
+    private ISerializerDeserializer<IAObject> dateTimeSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADATETIME);
+    private ISerializerDeserializer<IAObject> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    private ArrayTupleReference tuple = new ArrayTupleReference();
+
+    public ITupleReference getTupleFromFile(ExternalFile file) throws IOException, AsterixException {
+        tupleBuilder.reset();
+        //File Number
+        aInt32.setValue(file.getFileNumber());
+        filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFields()[0].serialize(aInt32,
+                tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        //File Record
+        recordBuilder.reset(filesIndexDescription.EXTERNAL_FILE_RECORD_TYPE);
+        // write field 0 (File Name)
+        fieldValue.reset();
+        aString.setValue(file.getFileName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(0, fieldValue);
+
+        //write field 1 (File Size)
+        fieldValue.reset();
+        aInt64.setValue(file.getSize());
+        longSerde.serialize(aInt64, fieldValue.getDataOutput());
+        recordBuilder.addField(1, fieldValue);
+
+        //write field 2 (File Mod Date)
+        fieldValue.reset();
+        aDateTime.setValue(file.getLastModefiedTime().getTime());
+        dateTimeSerde.serialize(aDateTime, fieldValue.getDataOutput());
+        recordBuilder.addField(2, fieldValue);
+
+        //write the record
+        recordBuilder.write(tupleBuilder.getDataOutput(), true);
+        tupleBuilder.addFieldEndOffset();
+        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+        return tuple;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
new file mode 100644
index 0000000..932aece
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class FileOffsetIndexer implements IExternalIndexer {
+
+    private static final long serialVersionUID = 1L;
+    public static final int NUM_OF_FIELDS = 2;
+    protected AMutableInt32 fileNumber = new AMutableInt32(0);
+    protected AMutableInt64 offset = new AMutableInt64(0);
+    protected RecordReader<?, Writable> recordReader;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<IAObject> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<IAObject> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+
+    @Override
+    public void reset(IRecordReader<?> reader) throws IOException {
+        //TODO: Make it more generic since we can't assume it is always going to be HDFS records.
+        @SuppressWarnings("unchecked")
+        HDFSRecordReader<?, Writable> hdfsReader = (HDFSRecordReader<?, Writable>) reader;
+        fileNumber.setValue(hdfsReader.getSnapshot().get(hdfsReader.getCurrentSplitIndex()).getFileNumber());
+        recordReader = hdfsReader.getReader();
+        offset.setValue(recordReader.getPos());
+    }
+
+    @Override
+    public void index(ArrayTupleBuilder tb) throws IOException {
+        tb.addField(intSerde, fileNumber);
+        tb.addField(longSerde, offset);
+        // Get position for next index(tb) call
+        offset.setValue(recordReader.getPos());
+    }
+
+    @Override
+    public int getNumberOfFields() {
+        return NUM_OF_FIELDS;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
new file mode 100644
index 0000000..870a6df
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.hdfs.scheduler.Scheduler;
+
+public class IndexingScheduler {
+    private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
+
+    /** a list of NCs */
+    private String[] NCs;
+
+    /** a map from ip to NCs */
+    private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
+
+    /** a map from the NC name to the index */
+    private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+
+    /** a map from NC name to the NodeControllerInfo */
+    private Map<String, NodeControllerInfo> ncNameToNcInfos;
+
+    /**
+     * The constructor of the scheduler.
+     *
+     * @param ncNameToNcInfos
+     * @throws HyracksException
+     */
+    public IndexingScheduler(String ipAddress, int port) throws HyracksException {
+        try {
+            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+            this.ncNameToNcInfos = hcc.getNodeControllerInfos();
+            loadIPAddressToNCMap(ncNameToNcInfos);
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    /**
+     * Set location constraints for a file scan operator with a list of file
+     * splits. It tries to assign splits to their local machines fairly
+     * Locality is more important than fairness
+     *
+     * @throws HyracksDataException
+     */
+    public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
+        if (splits == null) {
+            /** deal the case when the splits array is null */
+            return new String[] {};
+        }
+        int[] workloads = new int[NCs.length];
+        Arrays.fill(workloads, 0);
+        String[] locations = new String[splits.length];
+        Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
+        /**
+         * upper bound is number of splits
+         */
+        int upperBoundSlots = splits.length;
+
+        try {
+            Random random = new Random(System.currentTimeMillis());
+            boolean scheduled[] = new boolean[splits.length];
+            Arrays.fill(scheduled, false);
+            /**
+             * scan the splits and build the popularity map
+             * give the machines with less local splits more scheduling priority
+             */
+            buildPopularityMap(splits, locationToNumOfSplits);
+            HashMap<String, Integer> locationToNumOfAssignement = new HashMap<String, Integer>();
+            for (String location : locationToNumOfSplits.keySet()) {
+                locationToNumOfAssignement.put(location, 0);
+            }
+            /**
+             * push data-local upper-bounds slots to each machine
+             */
+            scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits,
+                    locationToNumOfAssignement);
+
+            int dataLocalCount = 0;
+            for (int i = 0; i < scheduled.length; i++) {
+                if (scheduled[i] == true) {
+                    dataLocalCount++;
+                }
+            }
+            LOGGER.info("Data local rate: "
+                    + (scheduled.length == 0 ? 0.0 : ((float) dataLocalCount / (float) (scheduled.length))));
+            /**
+             * push non-data-local upper-bounds slots to each machine
+             */
+            locationToNumOfAssignement.clear();
+            for (String nc : NCs) {
+                locationToNumOfAssignement.put(nc, 0);
+            }
+            for (int i = 0; i < scheduled.length; i++) {
+                if (scheduled[i]) {
+                    locationToNumOfAssignement.put(locations[i], locationToNumOfAssignement.get(locations[i]) + 1);
+                }
+            }
+
+            scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled, locationToNumOfAssignement);
+            return locations;
+        } catch (IOException e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    /**
+     * Schedule non-local slots to each machine
+     *
+     * @param splits
+     *            The HDFS file splits.
+     * @param workloads
+     *            The current capacity of each machine.
+     * @param locations
+     *            The result schedule.
+     * @param slotLimit
+     *            The maximum slots of each machine.
+     * @param scheduled
+     *            Indicate which slot is scheduled.
+     * @param locationToNumOfAssignement
+     */
+    private void scheduleNonLocalSlots(InputSplit[] splits, final int[] workloads, String[] locations, int slotLimit,
+            boolean[] scheduled, final HashMap<String, Integer> locationToNumOfAssignement)
+                    throws IOException, UnknownHostException {
+
+        PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(NCs.length, new Comparator<String>() {
+            @Override
+            public int compare(String s1, String s2) {
+                return locationToNumOfAssignement.get(s1).compareTo(locationToNumOfAssignement.get(s2));
+            }
+
+        });
+
+        for (String nc : NCs) {
+            scheduleCadndiates.add(nc);
+        }
+        /**
+         * schedule no-local file reads
+         */
+        for (int i = 0; i < splits.length; i++) {
+            /** if there is no data-local NC choice, choose a random one */
+            if (!scheduled[i]) {
+                String selectedNcName = scheduleCadndiates.remove();
+                if (selectedNcName != null) {
+                    int ncIndex = ncNameToIndex.get(selectedNcName);
+                    workloads[ncIndex]++;
+                    scheduled[i] = true;
+                    locations[i] = selectedNcName;
+                    locationToNumOfAssignement.put(selectedNcName, workloads[ncIndex]);
+                    scheduleCadndiates.add(selectedNcName);
+                }
+            }
+        }
+    }
+
+    /**
+     * Schedule data-local slots to each machine.
+     *
+     * @param splits
+     *            The HDFS file splits.
+     * @param workloads
+     *            The current capacity of each machine.
+     * @param locations
+     *            The result schedule.
+     * @param slots
+     *            The maximum slots of each machine.
+     * @param random
+     *            The random generator.
+     * @param scheduled
+     *            Indicate which slot is scheduled.
+     * @throws IOException
+     * @throws UnknownHostException
+     */
+    private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
+            boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits,
+            final HashMap<String, Integer> locationToNumOfAssignement) throws IOException, UnknownHostException {
+        /** scheduling candidates will be ordered inversely according to their popularity */
+        PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
+            @Override
+            public int compare(String s1, String s2) {
+                int assignmentDifference = locationToNumOfAssignement.get(s1)
+                        .compareTo(locationToNumOfAssignement.get(s2));
+                if (assignmentDifference != 0) {
+                    return assignmentDifference;
+                }
+                return locationToNumSplits.get(s1).compareTo(locationToNumSplits.get(s2));
+            }
+
+        });
+
+        for (int i = 0; i < splits.length; i++) {
+            if (scheduled[i]) {
+                continue;
+            }
+            /**
+             * get the location of all the splits
+             */
+            String[] locs = splits[i].getLocations();
+            if (locs.length > 0) {
+                scheduleCadndiates.clear();
+                for (int j = 0; j < locs.length; j++) {
+                    scheduleCadndiates.add(locs[j]);
+                }
+
+                for (String candidate : scheduleCadndiates) {
+                    /**
+                     * get all the IP addresses from the name
+                     */
+                    InetAddress[] allIps = InetAddress.getAllByName(candidate);
+                    /**
+                     * iterate overa all ips
+                     */
+                    for (InetAddress ip : allIps) {
+                        /**
+                         * if the node controller exists
+                         */
+                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+                            /**
+                             * set the ncs
+                             */
+                            List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
+                            int arrayPos = random.nextInt(dataLocations.size());
+                            String nc = dataLocations.get(arrayPos);
+                            int pos = ncNameToIndex.get(nc);
+                            /**
+                             * check if the node is already full
+                             */
+                            if (workloads[pos] < slots) {
+                                locations[i] = nc;
+                                workloads[pos]++;
+                                scheduled[i] = true;
+                                locationToNumOfAssignement.put(candidate,
+                                        locationToNumOfAssignement.get(candidate) + 1);
+                                break;
+                            }
+                        }
+                    }
+                    /**
+                     * break the loop for data-locations if the schedule has
+                     * already been found
+                     */
+                    if (scheduled[i] == true) {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Scan the splits once and build a popularity map
+     *
+     * @param splits
+     *            the split array
+     * @param locationToNumOfSplits
+     *            the map to be built
+     * @throws IOException
+     */
+    private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)
+            throws IOException {
+        for (InputSplit split : splits) {
+            String[] locations = split.getLocations();
+            for (String loc : locations) {
+                IntWritable locCount = locationToNumOfSplits.get(loc);
+                if (locCount == null) {
+                    locCount = new IntWritable(0);
+                    locationToNumOfSplits.put(loc, locCount);
+                }
+                locCount.set(locCount.get() + 1);
+            }
+        }
+    }
+
+    /**
+     * Load the IP-address-to-NC map from the NCNameToNCInfoMap
+     *
+     * @param ncNameToNcInfos
+     * @throws HyracksException
+     */
+    private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+        try {
+            NCs = new String[ncNameToNcInfos.size()];
+            ipToNcMapping.clear();
+            ncNameToIndex.clear();
+            int i = 0;
+
+            /**
+             * build the IP address to NC map
+             */
+            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
+                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().lookupIpAddress())
+                        .getHostAddress();
+                List<String> matchedNCs = ipToNcMapping.get(ipAddr);
+                if (matchedNCs == null) {
+                    matchedNCs = new ArrayList<String>();
+                    ipToNcMapping.put(ipAddr, matchedNCs);
+                }
+                matchedNCs.add(entry.getKey());
+                NCs[i] = entry.getKey();
+                i++;
+            }
+
+            /**
+             * set up the NC name to index mapping
+             */
+            for (i = 0; i < NCs.length; i++) {
+                ncNameToIndex.put(NCs[i], i);
+            }
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+}