You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 16:59:51 UTC

[16/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master' into hyracks-merge2

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index 8ec422f,0000000..a301ac9
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@@ -1,45 -1,0 +1,47 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.dataflow;
 +
 +import java.io.IOException;
 +
 +import org.apache.asterix.external.api.IRawRecord;
 +import org.apache.asterix.external.api.IRecordReader;
 +import org.apache.asterix.external.api.IRecordWithPKDataParser;
 +import org.apache.asterix.external.util.FeedLogManager;
 +import org.apache.hyracks.api.context.IHyracksTaskContext;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 +
 +public class ChangeFeedDataFlowController<T> extends FeedRecordDataFlowController<T> {
 +
 +    private final IRecordWithPKDataParser<T> dataParser;
 +
 +    public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
 +            final FeedLogManager feedLogManager, final int numOfOutputFields,
-             final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader) {
++            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
++            throws HyracksDataException {
 +        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
 +        this.dataParser = dataParser;
 +    }
 +
 +    @Override
 +    protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record) throws IOException {
 +        dataParser.appendKeys(tb, record);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 370eec0,0000000..aac7be2
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@@ -1,42 -1,0 +1,44 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.dataflow;
 +
 +import java.io.IOException;
 +
 +import org.apache.asterix.external.api.IRawRecord;
 +import org.apache.asterix.external.api.IRecordReader;
 +import org.apache.asterix.external.parser.RecordWithMetadataParser;
 +import org.apache.asterix.external.util.FeedLogManager;
 +import org.apache.hyracks.api.context.IHyracksTaskContext;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 +
 +public class ChangeFeedWithMetaDataFlowController<T, O> extends FeedWithMetaDataFlowController<T, O> {
 +
 +    public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
 +            final FeedLogManager feedLogManager, final int numOfOutputFields,
-             final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader) {
++            final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader)
++            throws HyracksDataException {
 +        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
 +    }
 +
 +    @Override
 +    protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record) throws IOException {
 +        dataParser.appendPK(tb);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 6401234,0000000..a092620
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@@ -1,169 -1,0 +1,172 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.dataflow;
 +
 +import java.io.IOException;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import javax.annotation.Nonnull;
 +
 +import org.apache.asterix.external.api.IRawRecord;
 +import org.apache.asterix.external.api.IRecordDataParser;
 +import org.apache.asterix.external.api.IRecordReader;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 +import org.apache.asterix.external.util.FeedLogManager;
 +import org.apache.hyracks.api.comm.IFrameWriter;
 +import org.apache.hyracks.api.context.IHyracksTaskContext;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 +import org.apache.log4j.Logger;
 +
 +public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
 +    private static final Logger LOGGER = Logger.getLogger(FeedRecordDataFlowController.class.getName());
 +    protected final IRecordDataParser<T> dataParser;
 +    protected final IRecordReader<? extends T> recordReader;
 +    protected final AtomicBoolean closed = new AtomicBoolean(false);
 +    protected final long interval = 1000;
 +    protected boolean failed = false;
 +
 +    public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
 +            @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
-             @Nonnull IRecordReader<T> recordReader) {
++            @Nonnull IRecordReader<T> recordReader) throws HyracksDataException {
 +        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
 +        this.dataParser = dataParser;
 +        this.recordReader = recordReader;
 +        recordReader.setFeedLogManager(feedLogManager);
 +        recordReader.setController(this);
 +    }
 +
 +    @Override
 +    public void start(IFrameWriter writer) throws HyracksDataException {
 +        HyracksDataException hde = null;
 +        try {
 +            failed = false;
 +            tupleForwarder.initialize(ctx, writer);
 +            while (recordReader.hasNext()) {
 +                IRawRecord<? extends T> record = recordReader.next();
 +                if (record == null) {
 +                    flush();
 +                    Thread.sleep(interval);
 +                    continue;
 +                }
 +                tb.reset();
 +                try {
 +                    dataParser.parse(record, tb.getDataOutput());
 +                } catch (Exception e) {
 +                    e.printStackTrace();
 +                    LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
 +                    feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
 +                    continue;
 +                }
 +                tb.addFieldEndOffset();
 +                addMetaPart(tb, record);
 +                addPrimaryKeys(tb, record);
 +                if (tb.getSize() > tupleForwarder.getMaxRecordSize()) {
 +                    // log
 +                    feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_LARGE_RECORD);
 +                    continue;
 +                }
 +                tupleForwarder.addTuple(tb);
 +            }
++        } catch (InterruptedException e) {
++            //TODO: Find out what could cause an interrupted exception beside termination of a job/feed
++            LOGGER.warn("Feed has been interrupted. Closing the feed");
 +        } catch (Exception e) {
 +            failed = true;
 +            tupleForwarder.flush();
 +            LOGGER.warn("Failure while operating a feed source", e);
 +            throw new HyracksDataException(e);
 +        }
 +        try {
 +            tupleForwarder.close();
 +        } catch (Throwable th) {
 +            hde = ExternalDataExceptionUtils.suppress(hde, th);
 +        }
 +        try {
 +            recordReader.close();
 +        } catch (Throwable th) {
 +            LOGGER.warn("Failure during while operating a feed sourcec", th);
 +            hde = ExternalDataExceptionUtils.suppress(hde, th);
 +        } finally {
 +            closeSignal();
 +            if (hde != null) {
 +                throw hde;
 +            }
 +        }
 +    }
 +
 +    protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
 +    }
 +
 +    protected void addPrimaryKeys(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
 +    }
 +
 +    private void closeSignal() {
 +        synchronized (closed) {
 +            closed.set(true);
 +            closed.notifyAll();
 +        }
 +    }
 +
 +    private void waitForSignal() throws InterruptedException {
 +        synchronized (closed) {
 +            while (!closed.get()) {
 +                closed.wait();
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public boolean stop() throws HyracksDataException {
 +        HyracksDataException hde = null;
 +        if (recordReader.stop()) {
 +            if (failed) {
 +                // failed, close here
 +                try {
 +                    tupleForwarder.close();
 +                } catch (Throwable th) {
 +                    hde = ExternalDataExceptionUtils.suppress(hde, th);
 +                }
 +                try {
 +                    recordReader.close();
 +                } catch (Throwable th) {
 +                    hde = ExternalDataExceptionUtils.suppress(hde, th);
 +                }
 +                if (hde != null) {
 +                    throw hde;
 +                }
 +            } else {
 +                try {
 +                    waitForSignal();
 +                } catch (InterruptedException e) {
 +                    throw new HyracksDataException(e);
 +                }
 +            }
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    @Override
 +    public boolean handleException(Throwable th) {
 +        // This is not a parser record. most likely, this error happened in the record reader.
 +        return recordReader.handleException(th);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 203b5a7,0000000..e7c396b
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@@ -1,47 -1,0 +1,48 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.dataflow;
 +
 +import java.io.IOException;
 +
 +import org.apache.asterix.external.api.IRawRecord;
 +import org.apache.asterix.external.api.IRecordReader;
 +import org.apache.asterix.external.parser.RecordWithMetadataParser;
 +import org.apache.asterix.external.util.FeedLogManager;
 +import org.apache.hyracks.api.context.IHyracksTaskContext;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 +
 +public class FeedWithMetaDataFlowController<T, O> extends FeedRecordDataFlowController<T> {
 +
 +    //This field mask a super class field dataParser. We do this to avoid down-casting when calling parseMeta
 +    protected RecordWithMetadataParser<T, O> dataParser;
 +
 +    public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
 +            FeedLogManager feedLogManager, int numOfOutputFields, RecordWithMetadataParser<T, O> dataParser,
-             IRecordReader<T> recordReader) {
++            IRecordReader<T> recordReader) throws HyracksDataException {
 +        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
 +        this.dataParser = dataParser;
 +    }
 +
 +    @Override
 +    protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
 +        dataParser.parseMeta(tb.getDataOutput());
 +        tb.addFieldEndOffset();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index de185e0,0000000..529977a
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@@ -1,201 -1,0 +1,229 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.input;
 +
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.external.api.AsterixInputStream;
++import org.apache.asterix.external.api.IExternalIndexer;
 +import org.apache.asterix.external.api.IIndexibleExternalDataSource;
- import org.apache.asterix.external.api.IInputStreamFactory;
 +import org.apache.asterix.external.api.IRecordReader;
 +import org.apache.asterix.external.api.IRecordReaderFactory;
 +import org.apache.asterix.external.indexing.ExternalFile;
 +import org.apache.asterix.external.indexing.IndexingScheduler;
++import org.apache.asterix.external.input.record.reader.IndexingStreamRecordReader;
 +import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
++import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
 +import org.apache.asterix.external.input.stream.HDFSInputStream;
 +import org.apache.asterix.external.provider.ExternalIndexerProvider;
++import org.apache.asterix.external.provider.StreamRecordReaderProvider;
++import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
++import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.ExternalDataUtils;
 +import org.apache.asterix.external.util.HDFSUtils;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.mapred.InputSplit;
 +import org.apache.hadoop.mapred.JobConf;
 +import org.apache.hadoop.mapred.RecordReader;
 +import org.apache.hadoop.mapred.Reporter;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 +import org.apache.hyracks.api.context.IHyracksTaskContext;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 +import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
 +import org.apache.hyracks.hdfs.scheduler.Scheduler;
 +
- public class HDFSDataSourceFactory
-         implements IInputStreamFactory, IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
++public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
 +
 +    protected static final long serialVersionUID = 1L;
 +    protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 +    protected String[] readSchedule;
 +    protected boolean read[];
 +    protected InputSplitsFactory inputSplitsFactory;
 +    protected ConfFactory confFactory;
 +    protected boolean configured = false;
 +    protected static Scheduler hdfsScheduler;
 +    protected static IndexingScheduler indexingScheduler;
 +    protected static Boolean initialized = false;
 +    protected static Object initLock = new Object();
 +    protected List<ExternalFile> files;
 +    protected Map<String, String> configuration;
 +    protected Class<?> recordClass;
 +    protected boolean indexingOp = false;
 +    private JobConf conf;
 +    private InputSplit[] inputSplits;
 +    private String nodeName;
++    private Format format;
 +
 +    @Override
 +    public void configure(Map<String, String> configuration) throws AsterixException {
 +        try {
 +            init();
 +            this.configuration = configuration;
 +            JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
 +            confFactory = new ConfFactory(conf);
 +            clusterLocations = getPartitionConstraint();
 +            int numPartitions = clusterLocations.getLocations().length;
 +            // if files list was set, we restrict the splits to the list
 +            InputSplit[] inputSplits;
 +            if (files == null) {
 +                inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
 +            } else {
 +                inputSplits = HDFSUtils.getSplits(conf, files);
 +            }
 +            if (indexingOp) {
 +                readSchedule = indexingScheduler.getLocationConstraints(inputSplits);
 +            } else {
 +                readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
 +            }
 +            inputSplitsFactory = new InputSplitsFactory(inputSplits);
 +            read = new boolean[readSchedule.length];
 +            Arrays.fill(read, false);
-             if (!ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM)) {
++            String formatString = configuration.get(ExternalDataConstants.KEY_FORMAT);
++            if (formatString == null || formatString.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)) {
 +                RecordReader<?, ?> reader = conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
 +                this.recordClass = reader.createValue().getClass();
 +                reader.close();
++            } else {
++                format = StreamRecordReaderProvider.getReaderFormat(configuration);
++                this.recordClass = char[].class;
 +            }
 +        } catch (IOException e) {
 +            throw new AsterixException(e);
 +        }
 +    }
 +
 +    // Used to tell the factory to restrict the splits to the intersection between this list a
 +    // actual files on hde
 +    @Override
 +    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
 +        this.files = files;
 +        this.indexingOp = indexingOp;
 +    }
 +
 +    /*
 +     * The method below was modified to take care of the following
 +     * 1. when target files are not null, it generates a file aware input stream that validate
 +     * against the files
 +     * 2. if the data is binary, it returns a generic reader */
-     @Override
-     public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
++    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition, IExternalIndexer indexer)
++            throws HyracksDataException {
 +        try {
 +            if (!configured) {
 +                conf = confFactory.getConf();
 +                inputSplits = inputSplitsFactory.getSplits();
 +                nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
 +                configured = true;
 +            }
-             return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files);
++            return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files, indexer);
 +        } catch (Exception e) {
 +            throw new HyracksDataException(e);
 +        }
 +    }
 +
 +    /**
 +     * Get the cluster locations for this input stream factory. This method specifies on which asterix nodes the
 +     * external
 +     * adapter will run and how many threads per node.
 +     *
 +     * @return
 +     */
 +    @Override
 +    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
 +        clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
 +        return clusterLocations;
 +    }
 +
 +    /**
 +     * This method initialize the scheduler which assigns responsibility of reading different logical input splits from
 +     * HDFS
 +     */
 +    private static void init() {
 +        if (!initialized) {
 +            synchronized (initLock) {
 +                if (!initialized) {
 +                    hdfsScheduler = HDFSUtils.initializeHDFSScheduler();
 +                    indexingScheduler = HDFSUtils.initializeIndexingHDFSScheduler();
 +                    initialized = true;
 +                }
 +            }
 +        }
 +    }
 +
 +    public JobConf getJobConf() throws HyracksDataException {
 +        return confFactory.getConf();
 +    }
 +
 +    @Override
 +    public DataSourceType getDataSourceType() {
 +        return ExternalDataUtils.getDataSourceType(configuration);
 +    }
 +
++    /**
++     * HDFS Datasource is a special case in two ways:
++     * 1. It supports indexing.
++     * 2. It returns input as a set of writable object that we sometimes internally transform into a byte stream
++     * Hence, it can produce:
++     * 1. StreamRecordReader: When we transform the input into a byte stream.
++     * 2. Indexing Stream Record Reader: When we transform the input into a byte stream and perform indexing.
++     * 3. HDFS Record Reader: When we simply pass the Writable object as it is to the parser.
++     */
 +    @Override
-     public IRecordReader<? extends Writable> createRecordReader(IHyracksTaskContext ctx, int partition)
++    public IRecordReader<? extends Object> createRecordReader(IHyracksTaskContext ctx, int partition)
 +            throws HyracksDataException {
 +        try {
++            IExternalIndexer indexer = files == null ? null : ExternalIndexerProvider.getIndexer(configuration);
++            if (format != null) {
++                StreamRecordReader streamReader = StreamRecordReaderProvider.createRecordReader(format,
++                        createInputStream(ctx, partition, indexer), configuration);
++                if (indexer != null) {
++                    return new IndexingStreamRecordReader(streamReader, indexer);
++                } else {
++                    return streamReader;
++                }
++            }
 +            JobConf conf = confFactory.getConf();
 +            InputSplit[] inputSplits = inputSplitsFactory.getSplits();
 +            String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
 +            return new HDFSRecordReader<Object, Writable>(read, inputSplits, readSchedule, nodeName, conf, files,
-                     files == null ? null : ExternalIndexerProvider.getIndexer(configuration));
++                    indexer);
 +        } catch (Exception e) {
 +            throw new HyracksDataException(e);
 +        }
 +    }
 +
 +    @Override
 +    public Class<?> getRecordClass() {
 +        return recordClass;
 +    }
 +
 +    @Override
 +    public boolean isIndexible() {
 +        return true;
 +    }
 +
 +    @Override
 +    public boolean isIndexingOp() {
 +        return ((files != null) && indexingOp);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index 6964a82,0000000..aa0451a
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@@ -1,122 -1,0 +1,121 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.input.record.reader.stream;
 +
 +import java.io.IOException;
 +
 +import org.apache.asterix.external.api.AsterixInputStream;
- import org.apache.asterix.external.api.IExternalIndexer;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +
- public class EmptyLineSeparatedRecordReader extends AbstractStreamRecordReader {
++public class EmptyLineSeparatedRecordReader extends StreamRecordReader {
 +
-     public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream, IExternalIndexer indexer) {
-         super(inputStream, indexer);
++    public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream) {
++        super(inputStream);
 +    }
 +
 +    private boolean prevCharCR;
 +    private boolean prevCharLF;
 +    private int newlineLength;
 +    private int readLength;
 +
 +    @Override
 +    public boolean hasNext() throws IOException {
 +        if (done) {
 +            return false;
 +        }
 +        if (!skipWhiteSpace()) {
 +            done = true;
 +            close();
 +            return false;
 +        }
 +        newlineLength = 0;
 +        prevCharCR = false;
 +        prevCharLF = false;
 +        record.reset();
 +        readLength = 0;
 +        do {
 +            int startPosn = bufferPosn; //starting from where we left off the last time
 +            if (bufferPosn >= bufferLength) {
 +                startPosn = bufferPosn = 0;
 +                bufferLength = reader.read(inputBuffer);
 +                if (bufferLength <= 0) {
 +                    if (readLength > 0) {
 +                        record.endRecord();
 +                        return true;
 +                    }
 +                    close();
 +                    return false; //EOF
 +                }
 +            }
 +            for (; bufferPosn < bufferLength; ++bufferPosn) { //search for two consecutive newlines
 +                if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
 +                    if (prevCharLF) {
 +                        // \n\n
 +                        ++bufferPosn; // at next invocation proceed from following byte
 +                        newlineLength = 2;
 +                        break;
 +                    } else if (prevCharCR) {
 +                        newlineLength += 1;
 +                    }
 +                    prevCharLF = true;
 +                } else {
 +                    prevCharLF = false;
 +                }
 +                if (inputBuffer[bufferPosn] == ExternalDataConstants.CR) { //CR + notLF, we are at notLF
 +                    if (prevCharCR) {
 +                        // \cr\cr
 +                        newlineLength = 2;
 +                        break;
 +                    }
 +                    prevCharCR = true;
 +                } else {
 +                    prevCharCR = false;
 +                }
 +                if (!(prevCharCR || prevCharLF)) {
 +                    newlineLength = 0;
 +                }
 +            }
 +            readLength = bufferPosn - startPosn;
 +            if (readLength > 0) {
 +                record.append(inputBuffer, startPosn, readLength);
 +            }
 +        } while (newlineLength < 2);
 +        record.endRecord();
 +        return true;
 +    }
 +
 +    private boolean skipWhiteSpace() throws IOException {
 +        // start by skipping white spaces
 +        while (true) {
 +            if (bufferPosn < bufferLength) {
 +                if (!Character.isWhitespace(inputBuffer[bufferPosn])) {
 +                    return true;
 +                }
 +                bufferPosn++;
 +            } else {
 +                // fill buffer
 +                bufferPosn = 0;
 +                bufferLength = reader.read(inputBuffer);
 +                if (bufferLength < 0) {
 +                    return false;
 +                }
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 3089295,0000000..59b72e4
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@@ -1,114 -1,0 +1,121 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.input.record.reader.stream;
 +
 +import java.io.IOException;
 +
 +import org.apache.asterix.external.api.AsterixInputStream;
- import org.apache.asterix.external.api.IExternalIndexer;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +
- public class LineRecordReader extends AbstractStreamRecordReader {
++public class LineRecordReader extends StreamRecordReader {
 +
++    private final boolean hasHeader;
 +    protected boolean prevCharCR;
 +    protected int newlineLength;
 +    protected int recordNumber = 0;
++    protected boolean nextIsHeader = false;
 +
-     public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream, final IExternalIndexer indexer)
-             throws HyracksDataException {
-         super(stream, indexer);
-         try {
-             if (hasHeader) {
-                 if (hasNext()) {
-                     next();
-                 }
-             }
-         } catch (final IOException e) {
-             throw new HyracksDataException(e);
++    public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream) throws HyracksDataException {
++        super(stream);
++        this.hasHeader = hasHeader;
++        if (hasHeader) {
++            stream.setNotificationHandler(this);
 +        }
++    }
 +
++    @Override
++    public void notifyNewSource() {
++        if (hasHeader) {
++            nextIsHeader = true;
++        }
 +    }
 +
 +    @Override
 +    public boolean hasNext() throws IOException {
-         if (done) {
-             return false;
-         }
-         /*
-          * We're reading data from in, but the head of the stream may be
-          * already buffered in buffer, so we have several cases:
-          * 1. No newline characters are in the buffer, so we need to copy
-          *   everything and read another buffer from the stream.
-          * 2. An unambiguously terminated line is in buffer, so we just
-          *    copy to record.
-          * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-          *    in CR. In this case we copy everything up to CR to record, but
-          * we also need to see what follows CR: if it's LF, then we
-          * need consume LF as well, so next call to readLine will read
-          * from after that.
-          * We use a flag prevCharCR to signal if previous character was CR
-          * and, if it happens to be at the end of the buffer, delay
-          * consuming it until we have a chance to look at the char that
-          * follows.
-          */
-         newlineLength = 0; //length of terminating newline
-         prevCharCR = false; //true of prev char was CR
-         record.reset();
-         int readLength = 0;
-         do {
-             int startPosn = bufferPosn; //starting from where we left off the last time
-             if (bufferPosn >= bufferLength) {
-                 startPosn = bufferPosn = 0;
-                 bufferLength = reader.read(inputBuffer);
-                 if (bufferLength <= 0) {
-                     if (readLength > 0) {
-                         record.endRecord();
-                         recordNumber++;
-                         return true;
++        while (true) {
++            if (done) {
++                return false;
++            }
++            /*
++             * We're reading data from in, but the head of the stream may be
++             * already buffered in buffer, so we have several cases:
++             * 1. No newline characters are in the buffer, so we need to copy
++             *   everything and read another buffer from the stream.
++             * 2. An unambiguously terminated line is in buffer, so we just
++             *    copy to record.
++             * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
++             *    in CR. In this case we copy everything up to CR to record, but
++             * we also need to see what follows CR: if it's LF, then we
++             * need consume LF as well, so next call to readLine will read
++             * from after that.
++             * We use a flag prevCharCR to signal if previous character was CR
++             * and, if it happens to be at the end of the buffer, delay
++             * consuming it until we have a chance to look at the char that
++             * follows.
++             */
++            newlineLength = 0; //length of terminating newline
++            prevCharCR = false; //true of prev char was CR
++            record.reset();
++            int readLength = 0;
++            do {
++                int startPosn = bufferPosn; //starting from where we left off the last time
++                if (bufferPosn >= bufferLength) {
++                    startPosn = bufferPosn = 0;
++                    bufferLength = reader.read(inputBuffer);
++                    if (bufferLength <= 0) {
++                        if (readLength > 0) {
++                            record.endRecord();
++                            recordNumber++;
++                            return true;
++                        }
++                        close();
++                        return false; //EOF
 +                    }
-                     close();
-                     return false; //EOF
 +                }
-             }
-             for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
-                 if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
-                     newlineLength = (prevCharCR) ? 2 : 1;
-                     ++bufferPosn; // at next invocation proceed from following byte
-                     break;
++                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
++                    if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
++                        newlineLength = (prevCharCR) ? 2 : 1;
++                        ++bufferPosn; // at next invocation proceed from following byte
++                        break;
++                    }
++                    if (prevCharCR) { //CR + notLF, we are at notLF
++                        newlineLength = 1;
++                        break;
++                    }
++                    prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
 +                }
-                 if (prevCharCR) { //CR + notLF, we are at notLF
-                     newlineLength = 1;
-                     break;
++                readLength = bufferPosn - startPosn;
++                if (prevCharCR && newlineLength == 0) {
++                    --readLength; //CR at the end of the buffer
++                    prevCharCR = false;
 +                }
-                 prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
-             }
-             readLength = bufferPosn - startPosn;
-             if (prevCharCR && newlineLength == 0) {
-                 --readLength; //CR at the end of the buffer
-                 prevCharCR = false;
-             }
-             if (readLength > 0) {
-                 record.append(inputBuffer, startPosn, readLength);
++                if (readLength > 0) {
++                    record.append(inputBuffer, startPosn, readLength);
++                }
++            } while (newlineLength == 0);
++            if (nextIsHeader) {
++                nextIsHeader = false;
++                continue;
 +            }
-         } while (newlineLength == 0);
-         recordNumber++;
-         return true;
++            recordNumber++;
++            return true;
++        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index abd2952,0000000..88964a1
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@@ -1,119 -1,0 +1,125 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.input.record.reader.stream;
 +
 +import java.io.IOException;
 +
 +import org.apache.asterix.external.api.AsterixInputStream;
 +import org.apache.asterix.external.api.IExternalIndexer;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +
 +public class QuotedLineRecordReader extends LineRecordReader {
 +
 +    private final char quote;
 +    private boolean prevCharEscape;
 +    private boolean inQuote;
 +
-     public QuotedLineRecordReader(final boolean hasHeader, final AsterixInputStream stream,
-             final IExternalIndexer indexer, final String quoteString) throws HyracksDataException {
-         super(hasHeader, stream, indexer);
++    public QuotedLineRecordReader(final boolean hasHeader, final AsterixInputStream stream, final String quoteString)
++            throws HyracksDataException {
++        super(hasHeader, stream);
 +        if ((quoteString == null) || (quoteString.length() != 1)) {
 +            throw new HyracksDataException(ExternalDataExceptionUtils.incorrectParameterMessage(
 +                    ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
 +        }
 +        this.quote = quoteString.charAt(0);
 +    }
 +
 +    @Override
 +    public boolean hasNext() throws IOException {
-         if (done) {
-             return false;
-         }
-         newlineLength = 0;
-         prevCharCR = false;
-         prevCharEscape = false;
-         record.reset();
-         int readLength = 0;
-         inQuote = false;
-         do {
-             int startPosn = bufferPosn;
-             if (bufferPosn >= bufferLength) {
-                 startPosn = bufferPosn = 0;
-                 bufferLength = reader.read(inputBuffer);
-                 if (bufferLength <= 0) {
-                     {
-                         if (readLength > 0) {
-                             if (inQuote) {
-                                 throw new IOException("malformed input record ended inside quote");
++        while (true) {
++            if (done) {
++                return false;
++            }
++            newlineLength = 0;
++            prevCharCR = false;
++            prevCharEscape = false;
++            record.reset();
++            int readLength = 0;
++            inQuote = false;
++            do {
++                int startPosn = bufferPosn;
++                if (bufferPosn >= bufferLength) {
++                    startPosn = bufferPosn = 0;
++                    bufferLength = reader.read(inputBuffer);
++                    if (bufferLength <= 0) {
++                        {
++                            if (readLength > 0) {
++                                if (inQuote) {
++                                    throw new IOException("malformed input record ended inside quote");
++                                }
++                                record.endRecord();
++                                recordNumber++;
++                                return true;
 +                            }
-                             record.endRecord();
-                             recordNumber++;
-                             return true;
++                            close();
++                            return false;
 +                        }
-                         close();
-                         return false;
 +                    }
 +                }
-             }
-             for (; bufferPosn < bufferLength; ++bufferPosn) {
-                 if (!inQuote) {
-                     if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
-                         newlineLength = (prevCharCR) ? 2 : 1;
-                         ++bufferPosn;
-                         break;
-                     }
-                     if (prevCharCR) {
-                         newlineLength = 1;
-                         break;
-                     }
-                     prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
-                     if (inputBuffer[bufferPosn] == quote) {
-                         if (!prevCharEscape) {
-                             inQuote = true;
++                for (; bufferPosn < bufferLength; ++bufferPosn) {
++                    if (!inQuote) {
++                        if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
++                            newlineLength = (prevCharCR) ? 2 : 1;
++                            ++bufferPosn;
++                            break;
++                        }
++                        if (prevCharCR) {
++                            newlineLength = 1;
++                            break;
++                        }
++                        prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
++                        if (inputBuffer[bufferPosn] == quote) {
++                            if (!prevCharEscape) {
++                                inQuote = true;
++                            }
++                        }
++                        if (prevCharEscape) {
++                            prevCharEscape = false;
++                        } else {
++                            prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
 +                        }
-                     }
-                     if (prevCharEscape) {
-                         prevCharEscape = false;
 +                    } else {
-                         prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
-                     }
-                 } else {
-                     // only look for next quote
-                     if (inputBuffer[bufferPosn] == quote) {
-                         if (!prevCharEscape) {
-                             inQuote = false;
++                        // only look for next quote
++                        if (inputBuffer[bufferPosn] == quote) {
++                            if (!prevCharEscape) {
++                                inQuote = false;
++                            }
 +                        }
++                        prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
 +                    }
-                     prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
 +                }
++                readLength = bufferPosn - startPosn;
++                if (prevCharCR && newlineLength == 0) {
++                    --readLength;
++                }
++                if (readLength > 0) {
++                    record.append(inputBuffer, startPosn, readLength);
++                }
++            } while (newlineLength == 0);
++            if (nextIsHeader) {
++                nextIsHeader = false;
++                continue;
 +            }
-             readLength = bufferPosn - startPosn;
-             if (prevCharCR && newlineLength == 0) {
-                 --readLength;
-             }
-             if (readLength > 0) {
-                 record.append(inputBuffer, startPosn, readLength);
-             }
-         } while (newlineLength == 0);
-         recordNumber++;
-         return true;
++            recordNumber++;
++            return true;
++        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 7339bfd,0000000..26ac3cb
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@@ -1,164 -1,0 +1,164 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.input.record.reader.stream;
 +
 +import java.io.IOException;
 +
- import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.external.api.AsterixInputStream;
 +import org.apache.asterix.external.api.IExternalIndexer;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.ExternalDataExceptionUtils;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
 +
- public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
++public class SemiStructuredRecordReader extends StreamRecordReader {
 +
 +    private int depth;
 +    private boolean prevCharEscape;
 +    private boolean inString;
 +    private char recordStart;
 +    private char recordEnd;
 +    private int recordNumber = 0;
 +
-     public SemiStructuredRecordReader(AsterixInputStream stream, IExternalIndexer indexer, String recStartString,
-             String recEndString) throws AsterixException {
-         super(stream, indexer);
++    public SemiStructuredRecordReader(AsterixInputStream stream, String recStartString, String recEndString)
++            throws HyracksDataException {
++        super(stream);
 +        // set record opening char
 +        if (recStartString != null) {
 +            if (recStartString.length() != 1) {
-                 throw new AsterixException(
++                throw new HyracksDataException(
 +                        ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
 +                                ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
 +            }
 +            recordStart = recStartString.charAt(0);
 +        } else {
 +            recordStart = ExternalDataConstants.DEFAULT_RECORD_START;
 +        }
 +        // set record ending char
 +        if (recEndString != null) {
 +            if (recEndString.length() != 1) {
-                 throw new AsterixException(
++                throw new HyracksDataException(
 +                        ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
 +                                ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
 +            }
 +            recordEnd = recEndString.charAt(0);
 +        } else {
 +            recordEnd = ExternalDataConstants.DEFAULT_RECORD_END;
 +        }
 +    }
 +
 +    public int getRecordNumber() {
 +        return recordNumber;
 +    }
 +
 +    @Override
-     public boolean hasNext() throws Exception {
++    public boolean hasNext() throws IOException {
 +        if (done) {
 +            return false;
 +        }
 +        record.reset();
 +        boolean hasStarted = false;
 +        boolean hasFinished = false;
 +        prevCharEscape = false;
 +        inString = false;
 +        depth = 0;
 +        do {
 +            int startPosn = bufferPosn; // starting from where we left off the last time
 +            if (bufferPosn >= bufferLength) {
 +                startPosn = bufferPosn = 0;
 +                bufferLength = reader.read(inputBuffer);
 +                if (bufferLength < 0) {
 +                    close();
 +                    return false; // EOF
 +                }
 +            }
 +            if (!hasStarted) {
 +                for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
 +                    if (inputBuffer[bufferPosn] == recordStart) {
 +                        startPosn = bufferPosn;
 +                        hasStarted = true;
 +                        depth = 1;
 +                        ++bufferPosn; // at next invocation proceed from following byte
 +                        break;
 +                    } else if (inputBuffer[bufferPosn] != ExternalDataConstants.SPACE
 +                            && inputBuffer[bufferPosn] != ExternalDataConstants.TAB
 +                            && inputBuffer[bufferPosn] != ExternalDataConstants.LF
 +                            && inputBuffer[bufferPosn] != ExternalDataConstants.CR) {
 +                        // corrupted file. clear the buffer and stop reading
 +                        reader.reset();
 +                        bufferPosn = bufferLength = 0;
 +                        throw new IOException("Malformed input stream");
 +                    }
 +                }
 +            }
 +            if (hasStarted) {
 +                for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
 +                    if (inString) {
 +                        // we are in a string, we only care about the string end
 +                        if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
 +                            inString = false;
 +                        }
 +                        if (prevCharEscape) {
 +                            prevCharEscape = false;
 +                        } else {
 +                            prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
 +                        }
 +                    } else {
 +                        if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE) {
 +                            inString = true;
 +                        } else if (inputBuffer[bufferPosn] == recordStart) {
 +                            depth += 1;
 +                        } else if (inputBuffer[bufferPosn] == recordEnd) {
 +                            depth -= 1;
 +                            if (depth == 0) {
 +                                hasFinished = true;
 +                                ++bufferPosn; // at next invocation proceed from following byte
 +                                break;
 +                            }
 +                        }
 +                    }
 +                }
 +            }
 +
 +            int appendLength = bufferPosn - startPosn;
 +            if (appendLength > 0) {
 +                try {
 +                    record.append(inputBuffer, startPosn, appendLength);
 +                } catch (IOException e) {
 +                    reader.reset();
 +                    bufferPosn = bufferLength = 0;
 +                    throw new IOException("Malformed input stream");
 +                }
 +            }
 +        } while (!hasFinished);
 +        record.endRecord();
 +        recordNumber++;
 +        return true;
 +    }
 +
 +    @Override
 +    public boolean stop() {
 +        try {
 +            reader.stop();
 +        } catch (Exception e) {
 +            e.printStackTrace();
 +            return false;
 +        }
 +        return true;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 7ca185f,0000000..541737a
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@@ -1,145 -1,0 +1,150 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.input.record.reader.twitter;
 +
 +import java.util.Map;
 +import java.util.logging.Level;
 +import java.util.logging.Logger;
 +
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.external.api.IExternalDataSourceFactory;
 +import org.apache.asterix.external.api.IRecordReader;
 +import org.apache.asterix.external.api.IRecordReaderFactory;
 +import org.apache.asterix.external.util.ExternalDataConstants;
- import org.apache.asterix.external.util.ExternalDataUtils;
 +import org.apache.asterix.external.util.TwitterUtil;
 +import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
 +import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 +import org.apache.hyracks.api.context.IHyracksTaskContext;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +
 +import twitter4j.FilterQuery;
 +import twitter4j.Status;
 +
 +public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
 +
 +    private static final long serialVersionUID = 1L;
 +    private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName());
 +
 +    private static final String DEFAULT_INTERVAL = "10"; // 10 seconds
 +    private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
 +
 +    private Map<String, String> configuration;
 +    private boolean pull;
 +    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 +
 +    @Override
 +    public DataSourceType getDataSourceType() {
 +        return DataSourceType.RECORDS;
 +    }
 +
 +    @Override
 +    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
 +        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, INTAKE_CARDINALITY);
 +        return clusterLocations;
 +    }
 +
 +    @Override
 +    public void configure(Map<String, String> configuration) throws AsterixException {
 +        this.configuration = configuration;
 +        TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
 +        if (!validateConfiguration(configuration)) {
 +            StringBuilder builder = new StringBuilder();
 +            builder.append("One or more parameters are missing from adapter configuration\n");
 +            builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
 +            builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
 +            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
-             builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
++            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
 +            throw new AsterixException(builder.toString());
 +        }
-         if (ExternalDataUtils.isPull(configuration)) {
++        if (TwitterRecordReaderFactory.isTwitterPull(configuration)) {
 +            pull = true;
 +            if (configuration.get(SearchAPIConstants.QUERY) == null) {
 +                throw new AsterixException(
 +                        "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
 +            }
 +            String interval = configuration.get(SearchAPIConstants.INTERVAL);
 +            if (interval != null) {
 +                try {
 +                    Integer.parseInt(interval);
 +                } catch (NumberFormatException nfe) {
 +                    throw new IllegalArgumentException(
 +                            "parameter " + SearchAPIConstants.INTERVAL + " is defined incorrectly, expecting a number");
 +                }
 +            } else {
 +                configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
 +                if (LOGGER.isLoggable(Level.WARNING)) {
 +                    LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
 +                            + DEFAULT_INTERVAL + ")");
 +                }
 +            }
-         } else if (ExternalDataUtils.isPush(configuration)) {
-             pull = false;
 +        } else {
-             throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
-                     + ExternalDataConstants.KEY_PUSH + " must be specified as part of adaptor configuration");
++            pull = false;
++        }
++    }
++
++    public static boolean isTwitterPull(Map<String, String> configuration) {
++        String reader = configuration.get(ExternalDataConstants.KEY_READER);
++        if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
++                || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
++            return true;
 +        }
++        return false;
 +    }
 +
 +    @Override
 +    public boolean isIndexible() {
 +        return false;
 +    }
 +
 +    @Override
 +    public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition)
 +            throws HyracksDataException {
 +        if (pull) {
 +            return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
 +                    configuration.get(SearchAPIConstants.QUERY),
 +                    Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
 +        } else {
 +            FilterQuery query;
 +            try {
 +                query = TwitterUtil.getFilterQuery(configuration);
 +                return (query == null) ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration))
 +                        : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), query);
 +            } catch (AsterixException e) {
 +                throw new HyracksDataException(e);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public Class<? extends Status> getRecordClass() {
 +        return Status.class;
 +    }
 +
 +    private boolean validateConfiguration(Map<String, String> configuration) {
 +        String consumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
 +        String consumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
 +        String accessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
 +        String tokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
 +        if ((consumerKey == null) || (consumerSecret == null) || (accessToken == null) || (tokenSecret == null)) {
 +            return false;
 +        }
 +        return true;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
index 7e280a5,0000000..94333d1
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
@@@ -1,120 -1,0 +1,121 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.input.stream;
 +
 +import java.io.IOException;
 +import java.io.Reader;
 +import java.nio.ByteBuffer;
 +import java.nio.CharBuffer;
 +import java.nio.charset.CharsetDecoder;
 +import java.nio.charset.StandardCharsets;
 +
 +import org.apache.asterix.external.api.AsterixInputStream;
 +import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.FeedLogManager;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
 +
 +public class AsterixInputStreamReader extends Reader {
 +    private AsterixInputStream in;
 +    private byte[] bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
 +    private ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
 +    private CharBuffer charBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
 +    private CharsetDecoder decoder;
 +    private boolean done = false;
 +
 +    public AsterixInputStreamReader(AsterixInputStream in) {
 +        this.in = in;
 +        this.decoder = StandardCharsets.UTF_8.newDecoder();
 +        this.byteBuffer.flip();
 +    }
 +
 +    public void stop() throws IOException {
 +        try {
 +            in.stop();
 +        } catch (Exception e) {
 +            throw new IOException(e);
 +        }
 +    }
 +
 +    public void setController(AbstractFeedDataFlowController controller) {
 +        in.setController(controller);
 +    }
 +
-     public void setFeedLogManager(FeedLogManager feedLogManager) {
++    public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
 +        in.setFeedLogManager(feedLogManager);
 +    }
 +
 +    @Override
 +    public int read(char cbuf[]) throws IOException {
 +        return read(cbuf, 0, cbuf.length);
 +    }
 +
 +    @Override
 +    public int read(char cbuf[], int offset, int length) throws IOException {
 +        if (done) {
 +            return -1;
 +        }
 +        int len = 0;
 +        charBuffer.clear();
 +        while (charBuffer.position() == 0) {
 +            if (byteBuffer.hasRemaining()) {
 +                decoder.decode(byteBuffer, charBuffer, false);
 +                System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
 +                if (charBuffer.position() > 0) {
 +                    return charBuffer.position();
 +                } else {
 +                    // need to read more data
 +                    System.arraycopy(bytes, byteBuffer.position(), bytes, 0, byteBuffer.remaining());
 +                    byteBuffer.position(byteBuffer.remaining());
 +                    while (len == 0) {
 +                        len = in.read(bytes, byteBuffer.position(), bytes.length - byteBuffer.position());
 +                    }
 +                }
 +            } else {
 +                byteBuffer.clear();
 +                while (len == 0) {
 +                    len = in.read(bytes, 0, bytes.length);
 +                }
 +            }
 +            if (len == -1) {
 +                done = true;
 +                return len;
 +            }
 +            byteBuffer.position(len);
 +            byteBuffer.flip();
 +            decoder.decode(byteBuffer, charBuffer, false);
 +            System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
 +        }
 +        return charBuffer.position();
 +    }
 +
 +    @Override
 +    public void close() throws IOException {
 +        in.close();
 +    }
 +
 +    public boolean handleException(Throwable th) {
 +        return in.handleException(th);
 +    }
 +
 +    @Override
 +    public void reset() throws IOException {
 +        byteBuffer.limit(0);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
index 063b8fa,0000000..997c254
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
@@@ -1,237 -1,0 +1,234 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.input.stream;
 +
 +import java.io.IOException;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.external.api.AsterixInputStream;
 +import org.apache.asterix.external.api.IExternalIndexer;
 +import org.apache.asterix.external.api.IIndexingDatasource;
 +import org.apache.asterix.external.indexing.ExternalFile;
 +import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader;
- import org.apache.asterix.external.provider.ExternalIndexerProvider;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.mapred.FileSplit;
 +import org.apache.hadoop.mapred.InputFormat;
 +import org.apache.hadoop.mapred.InputSplit;
 +import org.apache.hadoop.mapred.JobConf;
 +import org.apache.hadoop.mapred.RecordReader;
 +import org.apache.hadoop.mapred.Reporter;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +
 +public class HDFSInputStream extends AsterixInputStream implements IIndexingDatasource {
 +
 +    private RecordReader<Object, Text> reader;
 +    private Text value = null;
 +    private Object key = null;
 +    private int currentSplitIndex = 0;
 +    private boolean read[];
 +    private InputFormat<?, Text> inputFormat;
 +    private InputSplit[] inputSplits;
 +    private String[] readSchedule;
 +    private String nodeName;
 +    private JobConf conf;
 +    // Indexing variables
 +    private final IExternalIndexer indexer;
 +    private final List<ExternalFile> snapshot;
 +    private final FileSystem hdfs;
 +    private int pos = 0;
 +
 +    @SuppressWarnings("unchecked")
 +    public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
-             JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot)
-                     throws IOException, AsterixException {
++            JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot, IExternalIndexer indexer)
++            throws IOException, AsterixException {
 +        this.read = read;
 +        this.inputSplits = inputSplits;
 +        this.readSchedule = readSchedule;
 +        this.nodeName = nodeName;
 +        this.conf = conf;
 +        this.inputFormat = conf.getInputFormat();
 +        this.reader = new EmptyRecordReader<Object, Text>();
 +        this.snapshot = snapshot;
 +        this.hdfs = FileSystem.get(conf);
++        this.indexer = indexer;
 +        nextInputSplit();
 +        this.value = new Text();
 +        if (snapshot != null) {
-             this.indexer = ExternalIndexerProvider.getIndexer(configuration);
 +            if (currentSplitIndex < snapshot.size()) {
 +                indexer.reset(this);
 +            }
-         } else {
-             this.indexer = null;
 +        }
 +    }
 +
 +    @Override
 +    public int read() throws IOException {
 +        if (value.getLength() < pos) {
 +            if (!readMore()) {
 +                return -1;
 +            }
 +        } else if (value.getLength() == pos) {
 +            pos++;
 +            return ExternalDataConstants.BYTE_LF;
 +        }
 +        return value.getBytes()[pos++];
 +    }
 +
 +    private int readRecord(byte[] buffer, int offset, int len) {
 +        int actualLength = value.getLength() + 1;
 +        if ((actualLength - pos) > len) {
 +            //copy partial record
 +            System.arraycopy(value.getBytes(), pos, buffer, offset, len);
 +            pos += len;
 +            return len;
 +        } else {
 +            int numBytes = value.getLength() - pos;
 +            System.arraycopy(value.getBytes(), pos, buffer, offset, numBytes);
 +            buffer[offset + numBytes] = ExternalDataConstants.LF;
 +            pos += numBytes;
 +            numBytes++;
 +            return numBytes;
 +        }
 +    }
 +
 +    @Override
 +    public int read(byte[] buffer, int offset, int len) throws IOException {
 +        if (value.getLength() > pos) {
 +            return readRecord(buffer, offset, len);
 +        }
 +        if (!readMore()) {
 +            return -1;
 +        }
 +        return readRecord(buffer, offset, len);
 +    }
 +
 +    private boolean readMore() throws IOException {
 +        try {
 +            pos = 0;
 +            return HDFSInputStream.this.hasNext();
 +        } catch (Exception e) {
 +            throw new IOException(e);
 +        }
 +    }
 +
 +    @Override
 +    public boolean stop() throws Exception {
 +        return false;
 +    }
 +
 +    @Override
 +    public boolean handleException(Throwable th) {
 +        return false;
 +    }
 +
 +    @Override
 +    public void close() throws IOException {
 +        reader.close();
 +    }
 +
 +    private boolean hasNext() throws Exception {
 +        if (reader.next(key, value)) {
 +            return true;
 +        }
 +        while (nextInputSplit()) {
 +            if (reader.next(key, value)) {
 +                return true;
 +            }
 +        }
 +        return false;
 +    }
 +
 +    private boolean nextInputSplit() throws IOException {
 +        for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
 +            /**
 +             * read all the partitions scheduled to the current node
 +             */
 +            if (readSchedule[currentSplitIndex].equals(nodeName)) {
 +                /**
 +                 * pick an unread split to read synchronize among
 +                 * simultaneous partitions in the same machine
 +                 */
 +                synchronized (read) {
 +                    if (read[currentSplitIndex] == false) {
 +                        read[currentSplitIndex] = true;
 +                    } else {
 +                        continue;
 +                    }
 +                }
 +                if (snapshot != null) {
 +                    String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
 +                    FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName));
 +                    // Skip if not the same file stored in the files snapshot
 +                    if (fileStatus.getModificationTime() != snapshot.get(currentSplitIndex).getLastModefiedTime()
 +                            .getTime()) {
 +                        continue;
 +                    }
 +                }
 +
 +                reader.close();
 +                reader = getRecordReader(currentSplitIndex);
 +                return true;
 +            }
 +        }
 +        return false;
 +    }
 +
 +    @SuppressWarnings("unchecked")
 +    private RecordReader<Object, Text> getRecordReader(int splitIndex) throws IOException {
 +        reader = (RecordReader<Object, Text>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
 +        if (key == null) {
 +            key = reader.createKey();
 +            value = reader.createValue();
 +        }
 +        if (indexer != null) {
 +            try {
 +                indexer.reset(this);
 +            } catch (Exception e) {
 +                throw new HyracksDataException(e);
 +            }
 +        }
 +        return reader;
 +    }
 +
 +    @Override
 +    public IExternalIndexer getIndexer() {
 +        return indexer;
 +    }
 +
 +    @Override
 +    public List<ExternalFile> getSnapshot() {
 +        return snapshot;
 +    }
 +
 +    @Override
 +    public int getCurrentSplitIndex() {
 +        return currentSplitIndex;
 +    }
 +
 +    @Override
 +    public RecordReader<?, ? extends Writable> getReader() {
 +        return reader;
 +    }
 +}