You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 16:59:51 UTC
[16/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master'
into hyracks-merge2
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index 8ec422f,0000000..a301ac9
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@@ -1,45 -1,0 +1,47 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordWithPKDataParser;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class ChangeFeedDataFlowController<T> extends FeedRecordDataFlowController<T> {
+
+ private final IRecordWithPKDataParser<T> dataParser;
+
+ public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
+ final FeedLogManager feedLogManager, final int numOfOutputFields,
- final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader) {
++ final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
++ throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ this.dataParser = dataParser;
+ }
+
+ @Override
+ protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record) throws IOException {
+ dataParser.appendKeys(tb, record);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 370eec0,0000000..aac7be2
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@@ -1,42 -1,0 +1,44 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class ChangeFeedWithMetaDataFlowController<T, O> extends FeedWithMetaDataFlowController<T, O> {
+
+ public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
+ final FeedLogManager feedLogManager, final int numOfOutputFields,
- final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader) {
++ final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader)
++ throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ }
+
+ @Override
+ protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record) throws IOException {
+ dataParser.appendPK(tb);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 6401234,0000000..a092620
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@@ -1,169 -1,0 +1,172 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nonnull;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.log4j.Logger;
+
+public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
+ private static final Logger LOGGER = Logger.getLogger(FeedRecordDataFlowController.class.getName());
+ protected final IRecordDataParser<T> dataParser;
+ protected final IRecordReader<? extends T> recordReader;
+ protected final AtomicBoolean closed = new AtomicBoolean(false);
+ protected final long interval = 1000;
+ protected boolean failed = false;
+
+ public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
+ @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
- @Nonnull IRecordReader<T> recordReader) {
++ @Nonnull IRecordReader<T> recordReader) throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
+ this.dataParser = dataParser;
+ this.recordReader = recordReader;
+ recordReader.setFeedLogManager(feedLogManager);
+ recordReader.setController(this);
+ }
+
+ @Override
+ public void start(IFrameWriter writer) throws HyracksDataException {
+ HyracksDataException hde = null;
+ try {
+ failed = false;
+ tupleForwarder.initialize(ctx, writer);
+ while (recordReader.hasNext()) {
+ IRawRecord<? extends T> record = recordReader.next();
+ if (record == null) {
+ flush();
+ Thread.sleep(interval);
+ continue;
+ }
+ tb.reset();
+ try {
+ dataParser.parse(record, tb.getDataOutput());
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
+ feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
+ continue;
+ }
+ tb.addFieldEndOffset();
+ addMetaPart(tb, record);
+ addPrimaryKeys(tb, record);
+ if (tb.getSize() > tupleForwarder.getMaxRecordSize()) {
+ // log
+ feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_LARGE_RECORD);
+ continue;
+ }
+ tupleForwarder.addTuple(tb);
+ }
++ } catch (InterruptedException e) {
++ //TODO: Find out what could cause an interrupted exception beside termination of a job/feed
++ LOGGER.warn("Feed has been interrupted. Closing the feed");
+ } catch (Exception e) {
+ failed = true;
+ tupleForwarder.flush();
+ LOGGER.warn("Failure while operating a feed source", e);
+ throw new HyracksDataException(e);
+ }
+ try {
+ tupleForwarder.close();
+ } catch (Throwable th) {
+ hde = ExternalDataExceptionUtils.suppress(hde, th);
+ }
+ try {
+ recordReader.close();
+ } catch (Throwable th) {
+ LOGGER.warn("Failure during while operating a feed sourcec", th);
+ hde = ExternalDataExceptionUtils.suppress(hde, th);
+ } finally {
+ closeSignal();
+ if (hde != null) {
+ throw hde;
+ }
+ }
+ }
+
+ protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
+ }
+
+ protected void addPrimaryKeys(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
+ }
+
+ private void closeSignal() {
+ synchronized (closed) {
+ closed.set(true);
+ closed.notifyAll();
+ }
+ }
+
+ private void waitForSignal() throws InterruptedException {
+ synchronized (closed) {
+ while (!closed.get()) {
+ closed.wait();
+ }
+ }
+ }
+
+ @Override
+ public boolean stop() throws HyracksDataException {
+ HyracksDataException hde = null;
+ if (recordReader.stop()) {
+ if (failed) {
+ // failed, close here
+ try {
+ tupleForwarder.close();
+ } catch (Throwable th) {
+ hde = ExternalDataExceptionUtils.suppress(hde, th);
+ }
+ try {
+ recordReader.close();
+ } catch (Throwable th) {
+ hde = ExternalDataExceptionUtils.suppress(hde, th);
+ }
+ if (hde != null) {
+ throw hde;
+ }
+ } else {
+ try {
+ waitForSignal();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ // This is not a parser record. most likely, this error happened in the record reader.
+ return recordReader.handleException(th);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 203b5a7,0000000..e7c396b
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@@ -1,47 -1,0 +1,48 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class FeedWithMetaDataFlowController<T, O> extends FeedRecordDataFlowController<T> {
+
+ //This field mask a super class field dataParser. We do this to avoid down-casting when calling parseMeta
+ protected RecordWithMetadataParser<T, O> dataParser;
+
+ public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
+ FeedLogManager feedLogManager, int numOfOutputFields, RecordWithMetadataParser<T, O> dataParser,
- IRecordReader<T> recordReader) {
++ IRecordReader<T> recordReader) throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ this.dataParser = dataParser;
+ }
+
+ @Override
+ protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
+ dataParser.parseMeta(tb.getDataOutput());
+ tb.addFieldEndOffset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index de185e0,0000000..529977a
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@@ -1,201 -1,0 +1,229 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
++import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexibleExternalDataSource;
- import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.IndexingScheduler;
++import org.apache.asterix.external.input.record.reader.IndexingStreamRecordReader;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
++import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
+import org.apache.asterix.external.input.stream.HDFSInputStream;
+import org.apache.asterix.external.provider.ExternalIndexerProvider;
++import org.apache.asterix.external.provider.StreamRecordReaderProvider;
++import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
++import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
+import org.apache.hyracks.hdfs.scheduler.Scheduler;
+
- public class HDFSDataSourceFactory
- implements IInputStreamFactory, IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
++public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
+
+ protected static final long serialVersionUID = 1L;
+ protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+ protected String[] readSchedule;
+ protected boolean read[];
+ protected InputSplitsFactory inputSplitsFactory;
+ protected ConfFactory confFactory;
+ protected boolean configured = false;
+ protected static Scheduler hdfsScheduler;
+ protected static IndexingScheduler indexingScheduler;
+ protected static Boolean initialized = false;
+ protected static Object initLock = new Object();
+ protected List<ExternalFile> files;
+ protected Map<String, String> configuration;
+ protected Class<?> recordClass;
+ protected boolean indexingOp = false;
+ private JobConf conf;
+ private InputSplit[] inputSplits;
+ private String nodeName;
++ private Format format;
+
+ @Override
+ public void configure(Map<String, String> configuration) throws AsterixException {
+ try {
+ init();
+ this.configuration = configuration;
+ JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
+ confFactory = new ConfFactory(conf);
+ clusterLocations = getPartitionConstraint();
+ int numPartitions = clusterLocations.getLocations().length;
+ // if files list was set, we restrict the splits to the list
+ InputSplit[] inputSplits;
+ if (files == null) {
+ inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+ } else {
+ inputSplits = HDFSUtils.getSplits(conf, files);
+ }
+ if (indexingOp) {
+ readSchedule = indexingScheduler.getLocationConstraints(inputSplits);
+ } else {
+ readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
+ }
+ inputSplitsFactory = new InputSplitsFactory(inputSplits);
+ read = new boolean[readSchedule.length];
+ Arrays.fill(read, false);
- if (!ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM)) {
++ String formatString = configuration.get(ExternalDataConstants.KEY_FORMAT);
++ if (formatString == null || formatString.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)) {
+ RecordReader<?, ?> reader = conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
+ this.recordClass = reader.createValue().getClass();
+ reader.close();
++ } else {
++ format = StreamRecordReaderProvider.getReaderFormat(configuration);
++ this.recordClass = char[].class;
+ }
+ } catch (IOException e) {
+ throw new AsterixException(e);
+ }
+ }
+
+ // Used to tell the factory to restrict the splits to the intersection between this list a
+ // actual files on hde
+ @Override
+ public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
+ this.files = files;
+ this.indexingOp = indexingOp;
+ }
+
+ /*
+ * The method below was modified to take care of the following
+ * 1. when target files are not null, it generates a file aware input stream that validate
+ * against the files
+ * 2. if the data is binary, it returns a generic reader */
- @Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
++ public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition, IExternalIndexer indexer)
++ throws HyracksDataException {
+ try {
+ if (!configured) {
+ conf = confFactory.getConf();
+ inputSplits = inputSplitsFactory.getSplits();
+ nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ configured = true;
+ }
- return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files);
++ return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files, indexer);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ /**
+ * Get the cluster locations for this input stream factory. This method specifies on which asterix nodes the
+ * external
+ * adapter will run and how many threads per node.
+ *
+ * @return
+ */
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
+ return clusterLocations;
+ }
+
+ /**
+ * This method initialize the scheduler which assigns responsibility of reading different logical input splits from
+ * HDFS
+ */
+ private static void init() {
+ if (!initialized) {
+ synchronized (initLock) {
+ if (!initialized) {
+ hdfsScheduler = HDFSUtils.initializeHDFSScheduler();
+ indexingScheduler = HDFSUtils.initializeIndexingHDFSScheduler();
+ initialized = true;
+ }
+ }
+ }
+ }
+
+ public JobConf getJobConf() throws HyracksDataException {
+ return confFactory.getConf();
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return ExternalDataUtils.getDataSourceType(configuration);
+ }
+
++ /**
++ * HDFS Datasource is a special case in two ways:
++ * 1. It supports indexing.
++ * 2. It returns input as a set of writable object that we sometimes internally transform into a byte stream
++ * Hence, it can produce:
++ * 1. StreamRecordReader: When we transform the input into a byte stream.
++ * 2. Indexing Stream Record Reader: When we transform the input into a byte stream and perform indexing.
++ * 3. HDFS Record Reader: When we simply pass the Writable object as it is to the parser.
++ */
+ @Override
- public IRecordReader<? extends Writable> createRecordReader(IHyracksTaskContext ctx, int partition)
++ public IRecordReader<? extends Object> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ try {
++ IExternalIndexer indexer = files == null ? null : ExternalIndexerProvider.getIndexer(configuration);
++ if (format != null) {
++ StreamRecordReader streamReader = StreamRecordReaderProvider.createRecordReader(format,
++ createInputStream(ctx, partition, indexer), configuration);
++ if (indexer != null) {
++ return new IndexingStreamRecordReader(streamReader, indexer);
++ } else {
++ return streamReader;
++ }
++ }
+ JobConf conf = confFactory.getConf();
+ InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+ String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ return new HDFSRecordReader<Object, Writable>(read, inputSplits, readSchedule, nodeName, conf, files,
- files == null ? null : ExternalIndexerProvider.getIndexer(configuration));
++ indexer);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return recordClass;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return true;
+ }
+
+ @Override
+ public boolean isIndexingOp() {
+ return ((files != null) && indexingOp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index 6964a82,0000000..aa0451a
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@@ -1,122 -1,0 +1,121 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.AsterixInputStream;
- import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.util.ExternalDataConstants;
+
- public class EmptyLineSeparatedRecordReader extends AbstractStreamRecordReader {
++public class EmptyLineSeparatedRecordReader extends StreamRecordReader {
+
- public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream, IExternalIndexer indexer) {
- super(inputStream, indexer);
++ public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream) {
++ super(inputStream);
+ }
+
+ private boolean prevCharCR;
+ private boolean prevCharLF;
+ private int newlineLength;
+ private int readLength;
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (done) {
+ return false;
+ }
+ if (!skipWhiteSpace()) {
+ done = true;
+ close();
+ return false;
+ }
+ newlineLength = 0;
+ prevCharCR = false;
+ prevCharLF = false;
+ record.reset();
+ readLength = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = reader.read(inputBuffer);
+ if (bufferLength <= 0) {
+ if (readLength > 0) {
+ record.endRecord();
+ return true;
+ }
+ close();
+ return false; //EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for two consecutive newlines
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+ if (prevCharLF) {
+ // \n\n
+ ++bufferPosn; // at next invocation proceed from following byte
+ newlineLength = 2;
+ break;
+ } else if (prevCharCR) {
+ newlineLength += 1;
+ }
+ prevCharLF = true;
+ } else {
+ prevCharLF = false;
+ }
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.CR) { //CR + notLF, we are at notLF
+ if (prevCharCR) {
+ // \cr\cr
+ newlineLength = 2;
+ break;
+ }
+ prevCharCR = true;
+ } else {
+ prevCharCR = false;
+ }
+ if (!(prevCharCR || prevCharLF)) {
+ newlineLength = 0;
+ }
+ }
+ readLength = bufferPosn - startPosn;
+ if (readLength > 0) {
+ record.append(inputBuffer, startPosn, readLength);
+ }
+ } while (newlineLength < 2);
+ record.endRecord();
+ return true;
+ }
+
+ private boolean skipWhiteSpace() throws IOException {
+ // start by skipping white spaces
+ while (true) {
+ if (bufferPosn < bufferLength) {
+ if (!Character.isWhitespace(inputBuffer[bufferPosn])) {
+ return true;
+ }
+ bufferPosn++;
+ } else {
+ // fill buffer
+ bufferPosn = 0;
+ bufferLength = reader.read(inputBuffer);
+ if (bufferLength < 0) {
+ return false;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 3089295,0000000..59b72e4
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@@ -1,114 -1,0 +1,121 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.AsterixInputStream;
- import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
- public class LineRecordReader extends AbstractStreamRecordReader {
++public class LineRecordReader extends StreamRecordReader {
+
++ private final boolean hasHeader;
+ protected boolean prevCharCR;
+ protected int newlineLength;
+ protected int recordNumber = 0;
++ protected boolean nextIsHeader = false;
+
- public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream, final IExternalIndexer indexer)
- throws HyracksDataException {
- super(stream, indexer);
- try {
- if (hasHeader) {
- if (hasNext()) {
- next();
- }
- }
- } catch (final IOException e) {
- throw new HyracksDataException(e);
++ public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream) throws HyracksDataException {
++ super(stream);
++ this.hasHeader = hasHeader;
++ if (hasHeader) {
++ stream.setNotificationHandler(this);
+ }
++ }
+
++ @Override
++ public void notifyNewSource() {
++ if (hasHeader) {
++ nextIsHeader = true;
++ }
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
- if (done) {
- return false;
- }
- /*
- * We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to record.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to record, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- */
- newlineLength = 0; //length of terminating newline
- prevCharCR = false; //true of prev char was CR
- record.reset();
- int readLength = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- bufferLength = reader.read(inputBuffer);
- if (bufferLength <= 0) {
- if (readLength > 0) {
- record.endRecord();
- recordNumber++;
- return true;
++ while (true) {
++ if (done) {
++ return false;
++ }
++ /*
++ * We're reading data from in, but the head of the stream may be
++ * already buffered in buffer, so we have several cases:
++ * 1. No newline characters are in the buffer, so we need to copy
++ * everything and read another buffer from the stream.
++ * 2. An unambiguously terminated line is in buffer, so we just
++ * copy to record.
++ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
++ * in CR. In this case we copy everything up to CR to record, but
++ * we also need to see what follows CR: if it's LF, then we
++ * need consume LF as well, so next call to readLine will read
++ * from after that.
++ * We use a flag prevCharCR to signal if previous character was CR
++ * and, if it happens to be at the end of the buffer, delay
++ * consuming it until we have a chance to look at the char that
++ * follows.
++ */
++ newlineLength = 0; //length of terminating newline
++ prevCharCR = false; //true of prev char was CR
++ record.reset();
++ int readLength = 0;
++ do {
++ int startPosn = bufferPosn; //starting from where we left off the last time
++ if (bufferPosn >= bufferLength) {
++ startPosn = bufferPosn = 0;
++ bufferLength = reader.read(inputBuffer);
++ if (bufferLength <= 0) {
++ if (readLength > 0) {
++ record.endRecord();
++ recordNumber++;
++ return true;
++ }
++ close();
++ return false; //EOF
+ }
- close();
- return false; //EOF
+ }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
++ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
++ if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
++ newlineLength = (prevCharCR) ? 2 : 1;
++ ++bufferPosn; // at next invocation proceed from following byte
++ break;
++ }
++ if (prevCharCR) { //CR + notLF, we are at notLF
++ newlineLength = 1;
++ break;
++ }
++ prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+ }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
++ readLength = bufferPosn - startPosn;
++ if (prevCharCR && newlineLength == 0) {
++ --readLength; //CR at the end of the buffer
++ prevCharCR = false;
+ }
- prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
- }
- readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength; //CR at the end of the buffer
- prevCharCR = false;
- }
- if (readLength > 0) {
- record.append(inputBuffer, startPosn, readLength);
++ if (readLength > 0) {
++ record.append(inputBuffer, startPosn, readLength);
++ }
++ } while (newlineLength == 0);
++ if (nextIsHeader) {
++ nextIsHeader = false;
++ continue;
+ }
- } while (newlineLength == 0);
- recordNumber++;
- return true;
++ recordNumber++;
++ return true;
++ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index abd2952,0000000..88964a1
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@@ -1,119 -1,0 +1,125 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class QuotedLineRecordReader extends LineRecordReader {
+
+ private final char quote;
+ private boolean prevCharEscape;
+ private boolean inQuote;
+
- public QuotedLineRecordReader(final boolean hasHeader, final AsterixInputStream stream,
- final IExternalIndexer indexer, final String quoteString) throws HyracksDataException {
- super(hasHeader, stream, indexer);
++ public QuotedLineRecordReader(final boolean hasHeader, final AsterixInputStream stream, final String quoteString)
++ throws HyracksDataException {
++ super(hasHeader, stream);
+ if ((quoteString == null) || (quoteString.length() != 1)) {
+ throw new HyracksDataException(ExternalDataExceptionUtils.incorrectParameterMessage(
+ ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
+ }
+ this.quote = quoteString.charAt(0);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
- if (done) {
- return false;
- }
- newlineLength = 0;
- prevCharCR = false;
- prevCharEscape = false;
- record.reset();
- int readLength = 0;
- inQuote = false;
- do {
- int startPosn = bufferPosn;
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- bufferLength = reader.read(inputBuffer);
- if (bufferLength <= 0) {
- {
- if (readLength > 0) {
- if (inQuote) {
- throw new IOException("malformed input record ended inside quote");
++ while (true) {
++ if (done) {
++ return false;
++ }
++ newlineLength = 0;
++ prevCharCR = false;
++ prevCharEscape = false;
++ record.reset();
++ int readLength = 0;
++ inQuote = false;
++ do {
++ int startPosn = bufferPosn;
++ if (bufferPosn >= bufferLength) {
++ startPosn = bufferPosn = 0;
++ bufferLength = reader.read(inputBuffer);
++ if (bufferLength <= 0) {
++ {
++ if (readLength > 0) {
++ if (inQuote) {
++ throw new IOException("malformed input record ended inside quote");
++ }
++ record.endRecord();
++ recordNumber++;
++ return true;
+ }
- record.endRecord();
- recordNumber++;
- return true;
++ close();
++ return false;
+ }
- close();
- return false;
+ }
+ }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) {
- if (!inQuote) {
- if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn;
- break;
- }
- if (prevCharCR) {
- newlineLength = 1;
- break;
- }
- prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
- if (inputBuffer[bufferPosn] == quote) {
- if (!prevCharEscape) {
- inQuote = true;
++ for (; bufferPosn < bufferLength; ++bufferPosn) {
++ if (!inQuote) {
++ if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
++ newlineLength = (prevCharCR) ? 2 : 1;
++ ++bufferPosn;
++ break;
++ }
++ if (prevCharCR) {
++ newlineLength = 1;
++ break;
++ }
++ prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
++ if (inputBuffer[bufferPosn] == quote) {
++ if (!prevCharEscape) {
++ inQuote = true;
++ }
++ }
++ if (prevCharEscape) {
++ prevCharEscape = false;
++ } else {
++ prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ }
- }
- if (prevCharEscape) {
- prevCharEscape = false;
+ } else {
- prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
- }
- } else {
- // only look for next quote
- if (inputBuffer[bufferPosn] == quote) {
- if (!prevCharEscape) {
- inQuote = false;
++ // only look for next quote
++ if (inputBuffer[bufferPosn] == quote) {
++ if (!prevCharEscape) {
++ inQuote = false;
++ }
+ }
++ prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ }
- prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ }
++ readLength = bufferPosn - startPosn;
++ if (prevCharCR && newlineLength == 0) {
++ --readLength;
++ }
++ if (readLength > 0) {
++ record.append(inputBuffer, startPosn, readLength);
++ }
++ } while (newlineLength == 0);
++ if (nextIsHeader) {
++ nextIsHeader = false;
++ continue;
+ }
- readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength;
- }
- if (readLength > 0) {
- record.append(inputBuffer, startPosn, readLength);
- }
- } while (newlineLength == 0);
- recordNumber++;
- return true;
++ recordNumber++;
++ return true;
++ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 7339bfd,0000000..26ac3cb
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@@ -1,164 -1,0 +1,164 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+
- import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
+
- public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
++public class SemiStructuredRecordReader extends StreamRecordReader {
+
+ private int depth;
+ private boolean prevCharEscape;
+ private boolean inString;
+ private char recordStart;
+ private char recordEnd;
+ private int recordNumber = 0;
+
- public SemiStructuredRecordReader(AsterixInputStream stream, IExternalIndexer indexer, String recStartString,
- String recEndString) throws AsterixException {
- super(stream, indexer);
++ public SemiStructuredRecordReader(AsterixInputStream stream, String recStartString, String recEndString)
++ throws HyracksDataException {
++ super(stream);
+ // set record opening char
+ if (recStartString != null) {
+ if (recStartString.length() != 1) {
- throw new AsterixException(
++ throw new HyracksDataException(
+ ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
+ ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
+ }
+ recordStart = recStartString.charAt(0);
+ } else {
+ recordStart = ExternalDataConstants.DEFAULT_RECORD_START;
+ }
+ // set record ending char
+ if (recEndString != null) {
+ if (recEndString.length() != 1) {
- throw new AsterixException(
++ throw new HyracksDataException(
+ ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
+ ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
+ }
+ recordEnd = recEndString.charAt(0);
+ } else {
+ recordEnd = ExternalDataConstants.DEFAULT_RECORD_END;
+ }
+ }
+
+ public int getRecordNumber() {
+ return recordNumber;
+ }
+
+ @Override
- public boolean hasNext() throws Exception {
++ public boolean hasNext() throws IOException {
+ if (done) {
+ return false;
+ }
+ record.reset();
+ boolean hasStarted = false;
+ boolean hasFinished = false;
+ prevCharEscape = false;
+ inString = false;
+ depth = 0;
+ do {
+ int startPosn = bufferPosn; // starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = reader.read(inputBuffer);
+ if (bufferLength < 0) {
+ close();
+ return false; // EOF
+ }
+ }
+ if (!hasStarted) {
+ for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
+ if (inputBuffer[bufferPosn] == recordStart) {
+ startPosn = bufferPosn;
+ hasStarted = true;
+ depth = 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ } else if (inputBuffer[bufferPosn] != ExternalDataConstants.SPACE
+ && inputBuffer[bufferPosn] != ExternalDataConstants.TAB
+ && inputBuffer[bufferPosn] != ExternalDataConstants.LF
+ && inputBuffer[bufferPosn] != ExternalDataConstants.CR) {
+ // corrupted file. clear the buffer and stop reading
+ reader.reset();
+ bufferPosn = bufferLength = 0;
+ throw new IOException("Malformed input stream");
+ }
+ }
+ }
+ if (hasStarted) {
+ for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
+ if (inString) {
+ // we are in a string, we only care about the string end
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
+ inString = false;
+ }
+ if (prevCharEscape) {
+ prevCharEscape = false;
+ } else {
+ prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ }
+ } else {
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE) {
+ inString = true;
+ } else if (inputBuffer[bufferPosn] == recordStart) {
+ depth += 1;
+ } else if (inputBuffer[bufferPosn] == recordEnd) {
+ depth -= 1;
+ if (depth == 0) {
+ hasFinished = true;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ int appendLength = bufferPosn - startPosn;
+ if (appendLength > 0) {
+ try {
+ record.append(inputBuffer, startPosn, appendLength);
+ } catch (IOException e) {
+ reader.reset();
+ bufferPosn = bufferLength = 0;
+ throw new IOException("Malformed input stream");
+ }
+ }
+ } while (!hasFinished);
+ record.endRecord();
+ recordNumber++;
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ try {
+ reader.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 7ca185f,0000000..541737a
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@@ -1,145 -1,0 +1,150 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.twitter;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
- import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.TwitterUtil;
+import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
+import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import twitter4j.FilterQuery;
+import twitter4j.Status;
+
+public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName());
+
+ private static final String DEFAULT_INTERVAL = "10"; // 10 seconds
+ private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
+
+ private Map<String, String> configuration;
+ private boolean pull;
+ private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, INTAKE_CARDINALITY);
+ return clusterLocations;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws AsterixException {
+ this.configuration = configuration;
+ TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+ if (!validateConfiguration(configuration)) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("One or more parameters are missing from adapter configuration\n");
+ builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
+ builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
+ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
- builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
++ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+ throw new AsterixException(builder.toString());
+ }
- if (ExternalDataUtils.isPull(configuration)) {
++ if (TwitterRecordReaderFactory.isTwitterPull(configuration)) {
+ pull = true;
+ if (configuration.get(SearchAPIConstants.QUERY) == null) {
+ throw new AsterixException(
+ "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
+ }
+ String interval = configuration.get(SearchAPIConstants.INTERVAL);
+ if (interval != null) {
+ try {
+ Integer.parseInt(interval);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException(
+ "parameter " + SearchAPIConstants.INTERVAL + " is defined incorrectly, expecting a number");
+ }
+ } else {
+ configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
+ + DEFAULT_INTERVAL + ")");
+ }
+ }
- } else if (ExternalDataUtils.isPush(configuration)) {
- pull = false;
+ } else {
- throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
- + ExternalDataConstants.KEY_PUSH + " must be specified as part of adaptor configuration");
++ pull = false;
++ }
++ }
++
++ public static boolean isTwitterPull(Map<String, String> configuration) {
++ String reader = configuration.get(ExternalDataConstants.KEY_READER);
++ if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
++ || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
++ return true;
+ }
++ return false;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ if (pull) {
+ return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
+ configuration.get(SearchAPIConstants.QUERY),
+ Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
+ } else {
+ FilterQuery query;
+ try {
+ query = TwitterUtil.getFilterQuery(configuration);
+ return (query == null) ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration))
+ : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), query);
+ } catch (AsterixException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public Class<? extends Status> getRecordClass() {
+ return Status.class;
+ }
+
+ private boolean validateConfiguration(Map<String, String> configuration) {
+ String consumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+ String consumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+ String accessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+ String tokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+ if ((consumerKey == null) || (consumerSecret == null) || (accessToken == null) || (tokenSecret == null)) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
index 7e280a5,0000000..94333d1
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
@@@ -1,120 -1,0 +1,121 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedLogManager;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AsterixInputStreamReader extends Reader {
+ private AsterixInputStream in;
+ private byte[] bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+ private ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ private CharBuffer charBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ private CharsetDecoder decoder;
+ private boolean done = false;
+
+ public AsterixInputStreamReader(AsterixInputStream in) {
+ this.in = in;
+ this.decoder = StandardCharsets.UTF_8.newDecoder();
+ this.byteBuffer.flip();
+ }
+
+ public void stop() throws IOException {
+ try {
+ in.stop();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void setController(AbstractFeedDataFlowController controller) {
+ in.setController(controller);
+ }
+
- public void setFeedLogManager(FeedLogManager feedLogManager) {
++ public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
+ in.setFeedLogManager(feedLogManager);
+ }
+
+ @Override
+ public int read(char cbuf[]) throws IOException {
+ return read(cbuf, 0, cbuf.length);
+ }
+
+ @Override
+ public int read(char cbuf[], int offset, int length) throws IOException {
+ if (done) {
+ return -1;
+ }
+ int len = 0;
+ charBuffer.clear();
+ while (charBuffer.position() == 0) {
+ if (byteBuffer.hasRemaining()) {
+ decoder.decode(byteBuffer, charBuffer, false);
+ System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+ if (charBuffer.position() > 0) {
+ return charBuffer.position();
+ } else {
+ // need to read more data
+ System.arraycopy(bytes, byteBuffer.position(), bytes, 0, byteBuffer.remaining());
+ byteBuffer.position(byteBuffer.remaining());
+ while (len == 0) {
+ len = in.read(bytes, byteBuffer.position(), bytes.length - byteBuffer.position());
+ }
+ }
+ } else {
+ byteBuffer.clear();
+ while (len == 0) {
+ len = in.read(bytes, 0, bytes.length);
+ }
+ }
+ if (len == -1) {
+ done = true;
+ return len;
+ }
+ byteBuffer.position(len);
+ byteBuffer.flip();
+ decoder.decode(byteBuffer, charBuffer, false);
+ System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+ }
+ return charBuffer.position();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ public boolean handleException(Throwable th) {
+ return in.handleException(th);
+ }
+
+ @Override
+ public void reset() throws IOException {
+ byteBuffer.limit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
index 063b8fa,0000000..997c254
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
@@@ -1,237 -1,0 +1,234 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader;
- import org.apache.asterix.external.provider.ExternalIndexerProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HDFSInputStream extends AsterixInputStream implements IIndexingDatasource {
+
+ private RecordReader<Object, Text> reader;
+ private Text value = null;
+ private Object key = null;
+ private int currentSplitIndex = 0;
+ private boolean read[];
+ private InputFormat<?, Text> inputFormat;
+ private InputSplit[] inputSplits;
+ private String[] readSchedule;
+ private String nodeName;
+ private JobConf conf;
+ // Indexing variables
+ private final IExternalIndexer indexer;
+ private final List<ExternalFile> snapshot;
+ private final FileSystem hdfs;
+ private int pos = 0;
+
+ @SuppressWarnings("unchecked")
+ public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
- JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot)
- throws IOException, AsterixException {
++ JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot, IExternalIndexer indexer)
++ throws IOException, AsterixException {
+ this.read = read;
+ this.inputSplits = inputSplits;
+ this.readSchedule = readSchedule;
+ this.nodeName = nodeName;
+ this.conf = conf;
+ this.inputFormat = conf.getInputFormat();
+ this.reader = new EmptyRecordReader<Object, Text>();
+ this.snapshot = snapshot;
+ this.hdfs = FileSystem.get(conf);
++ this.indexer = indexer;
+ nextInputSplit();
+ this.value = new Text();
+ if (snapshot != null) {
- this.indexer = ExternalIndexerProvider.getIndexer(configuration);
+ if (currentSplitIndex < snapshot.size()) {
+ indexer.reset(this);
+ }
- } else {
- this.indexer = null;
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (value.getLength() < pos) {
+ if (!readMore()) {
+ return -1;
+ }
+ } else if (value.getLength() == pos) {
+ pos++;
+ return ExternalDataConstants.BYTE_LF;
+ }
+ return value.getBytes()[pos++];
+ }
+
+ private int readRecord(byte[] buffer, int offset, int len) {
+ int actualLength = value.getLength() + 1;
+ if ((actualLength - pos) > len) {
+ //copy partial record
+ System.arraycopy(value.getBytes(), pos, buffer, offset, len);
+ pos += len;
+ return len;
+ } else {
+ int numBytes = value.getLength() - pos;
+ System.arraycopy(value.getBytes(), pos, buffer, offset, numBytes);
+ buffer[offset + numBytes] = ExternalDataConstants.LF;
+ pos += numBytes;
+ numBytes++;
+ return numBytes;
+ }
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (value.getLength() > pos) {
+ return readRecord(buffer, offset, len);
+ }
+ if (!readMore()) {
+ return -1;
+ }
+ return readRecord(buffer, offset, len);
+ }
+
+ private boolean readMore() throws IOException {
+ try {
+ pos = 0;
+ return HDFSInputStream.this.hasNext();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean stop() throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ private boolean hasNext() throws Exception {
+ if (reader.next(key, value)) {
+ return true;
+ }
+ while (nextInputSplit()) {
+ if (reader.next(key, value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean nextInputSplit() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read synchronize among
+ * simultaneous partitions in the same machine
+ */
+ synchronized (read) {
+ if (read[currentSplitIndex] == false) {
+ read[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
+ }
+ if (snapshot != null) {
+ String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
+ FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName));
+ // Skip if not the same file stored in the files snapshot
+ if (fileStatus.getModificationTime() != snapshot.get(currentSplitIndex).getLastModefiedTime()
+ .getTime()) {
+ continue;
+ }
+ }
+
+ reader.close();
+ reader = getRecordReader(currentSplitIndex);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ private RecordReader<Object, Text> getRecordReader(int splitIndex) throws IOException {
+ reader = (RecordReader<Object, Text>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+ if (key == null) {
+ key = reader.createKey();
+ value = reader.createValue();
+ }
+ if (indexer != null) {
+ try {
+ indexer.reset(this);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ return reader;
+ }
+
+ @Override
+ public IExternalIndexer getIndexer() {
+ return indexer;
+ }
+
+ @Override
+ public List<ExternalFile> getSnapshot() {
+ return snapshot;
+ }
+
+ @Override
+ public int getCurrentSplitIndex() {
+ return currentSplitIndex;
+ }
+
+ @Override
+ public RecordReader<?, ? extends Writable> getReader() {
+ return reader;
+ }
+}