You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:11 UTC
[13/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalLoopkupOperatorDiscriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalLoopkupOperatorDiscriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalLoopkupOperatorDiscriptor.java
deleted file mode 100644
index ca2e7ca..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalLoopkupOperatorDiscriptor.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.adapter.factory.IControlledAdapterFactory;
-import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.FilesIndexDescription;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelper;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.common.IStorageManagerInterface;
-
-/*
- * This operator is intended for using record ids to access data in external sources
- */
-public class ExternalLoopkupOperatorDiscriptor extends AbstractTreeIndexOperatorDescriptor {
- private static final long serialVersionUID = 1L;
- private final IControlledAdapterFactory adapterFactory;
- private final INullWriterFactory iNullWriterFactory;
-
- public ExternalLoopkupOperatorDiscriptor(IOperatorDescriptorRegistry spec, IControlledAdapterFactory adapterFactory,
- RecordDescriptor outRecDesc, ExternalBTreeDataflowHelperFactory externalFilesIndexDataFlowHelperFactory,
- boolean propagateInput, IIndexLifecycleManagerProvider lcManagerProvider,
- IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider, int datasetId,
- double bloomFilterFalsePositiveRate, ISearchOperationCallbackFactory searchOpCallbackFactory,
- boolean retainNull, INullWriterFactory iNullWriterFactory) {
- super(spec, 1, 1, outRecDesc, storageManager, lcManagerProvider, fileSplitProvider,
- new FilesIndexDescription().EXTERNAL_FILE_INDEX_TYPE_TRAITS,
- new FilesIndexDescription().FILES_INDEX_COMP_FACTORIES, FilesIndexDescription.BLOOM_FILTER_FIELDS,
- externalFilesIndexDataFlowHelperFactory, null, propagateInput, retainNull, iNullWriterFactory, null,
- searchOpCallbackFactory, null);
- this.adapterFactory = adapterFactory;
- this.iNullWriterFactory = iNullWriterFactory;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
- throws HyracksDataException {
- // Create a file index accessor to be used for files lookup operations
- // Note that all file index accessors will use partition 0 since we only have 1 files index per NC
- final ExternalFileIndexAccessor fileIndexAccessor = new ExternalFileIndexAccessor(
- (ExternalBTreeDataflowHelper) dataflowHelperFactory.createIndexDataflowHelper(this, ctx, partition),
- this);
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- // The adapter that uses the file index along with the coming tuples to access files in HDFS
- private final IControlledAdapter adapter = adapterFactory.createAdapter(ctx, fileIndexAccessor,
- recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
- private boolean indexOpen = false;
- private boolean writerOpen = false;
-
- @Override
- public void open() throws HyracksDataException {
- //Open the file index accessor here
- fileIndexAccessor.openIndex();
- indexOpen = true;
- try {
- adapter.initialize(ctx, iNullWriterFactory);
- } catch (Throwable th) {
- // close the files index
- fileIndexAccessor.closeIndex();
- throw new HyracksDataException(th);
- }
- writerOpen = true;
- writer.open();
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- adapter.close(writer);
- } catch (Throwable th) {
- throw new HyracksDataException(th);
- } finally {
- try {
- if (indexOpen) {
- //close the file index
- fileIndexAccessor.closeIndex();
- }
- } finally {
- if (writerOpen) {
- writer.close();
- }
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- try {
- adapter.fail();
- } catch (Throwable th) {
- throw new HyracksDataException(th);
- } finally {
- if (writerOpen) {
- writer.fail();
- }
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- try {
- adapter.nextFrame(buffer, writer);
- } catch (Throwable th) {
- throw new HyracksDataException(th);
- }
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/IndexInfoOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/IndexInfoOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/IndexInfoOperatorDescriptor.java
deleted file mode 100644
index 6f367d2..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/IndexInfoOperatorDescriptor.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
-import org.apache.hyracks.storage.common.IStorageManagerInterface;
-import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-
-/*
- * This is a hack used to optain multiple index instances in a single operator and it is not actually used as an operator
- */
-public class IndexInfoOperatorDescriptor implements IIndexOperatorDescriptor{
-
- private static final long serialVersionUID = 1L;
- private final IFileSplitProvider fileSplitProvider;
- private final IStorageManagerInterface storageManager;
- private final IIndexLifecycleManagerProvider lifecycleManagerProvider;
- public IndexInfoOperatorDescriptor(IFileSplitProvider fileSplitProvider,IStorageManagerInterface storageManager,
- IIndexLifecycleManagerProvider lifecycleManagerProvider){
- this.fileSplitProvider = fileSplitProvider;
- this.lifecycleManagerProvider = lifecycleManagerProvider;
- this.storageManager = storageManager;
-
- }
-
- @Override
- public ActivityId getActivityId() {
- return null;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return null;
- }
-
- @Override
- public IFileSplitProvider getFileSplitProvider() {
- return fileSplitProvider;
- }
-
- @Override
- public IStorageManagerInterface getStorageManager() {
- return storageManager;
- }
-
- @Override
- public IIndexLifecycleManagerProvider getLifecycleManagerProvider() {
- return lifecycleManagerProvider;
- }
-
- @Override
- public RecordDescriptor getRecordDescriptor() {
- return null;
- }
-
- @Override
- public IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
- return null;
- }
-
- @Override
- public boolean getRetainInput() {
- return false;
- }
-
- @Override
- public ISearchOperationCallbackFactory getSearchOpCallbackFactory() {
- return null;
- }
-
- @Override
- public IModificationOperationCallbackFactory getModificationOpCallbackFactory() {
- return null;
- }
-
- @Override
- public ITupleFilterFactory getTupleFilterFactory() {
- return null;
- }
-
- @Override
- public ILocalResourceFactoryProvider getLocalResourceFactoryProvider() {
- return null;
- }
-
- @Override
- public boolean getRetainNull() {
- return false;
- }
-
- @Override
- public INullWriterFactory getNullWriterFactory() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
new file mode 100644
index 0000000..7e9fdcb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IIndexibleExternalDataSource;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.IndexingScheduler;
+import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.input.stream.HDFSInputStreamProvider;
+import org.apache.asterix.external.provider.ExternalIndexerProvider;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
+import org.apache.hyracks.hdfs.scheduler.Scheduler;
+
+public class HDFSDataSourceFactory
+ implements IInputStreamProviderFactory, IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
+
+ protected static final long serialVersionUID = 1L;
+ protected transient AlgebricksPartitionConstraint clusterLocations;
+ protected String[] readSchedule;
+ protected boolean read[];
+ protected InputSplitsFactory inputSplitsFactory;
+ protected ConfFactory confFactory;
+ protected boolean configured = false;
+ protected static Scheduler hdfsScheduler;
+ protected static IndexingScheduler indexingScheduler;
+ protected static Boolean initialized = false;
+ protected List<ExternalFile> files;
+ protected Map<String, String> configuration;
+ protected Class<?> recordClass;
+ protected boolean indexingOp = false;
+ private JobConf conf;
+ private InputSplit[] inputSplits;
+ private String nodeName;
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ if (!HDFSDataSourceFactory.initialized) {
+ HDFSDataSourceFactory.initialize();
+ }
+ this.configuration = configuration;
+ JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
+ confFactory = new ConfFactory(conf);
+ clusterLocations = getPartitionConstraint();
+ int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+ // if files list was set, we restrict the splits to the list
+ InputSplit[] inputSplits;
+ if (files == null) {
+ inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+ } else {
+ inputSplits = HDFSUtils.getSplits(conf, files);
+ }
+ if (indexingOp) {
+ readSchedule = indexingScheduler.getLocationConstraints(inputSplits);
+ } else {
+ readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
+ }
+ inputSplitsFactory = new InputSplitsFactory(inputSplits);
+ read = new boolean[readSchedule.length];
+ Arrays.fill(read, false);
+ if (!ExternalDataUtils.isDataSourceStreamProvider(configuration)) {
+ RecordReader<?, ?> reader = conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
+ this.recordClass = reader.createValue().getClass();
+ reader.close();
+ }
+ }
+
+ // Used to tell the factory to restrict the splits to the intersection between this list and the actual files on hdfs side
+ @Override
+ public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
+ this.files = files;
+ this.indexingOp = indexingOp;
+ }
+
+ /*
+ * The method below was modified to take care of the following
+ * 1. when target files are not null, it generates a file aware input stream that validate against the files
+ * 2. if the data is binary, it returns a generic reader
+ */
+ @Override
+ public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ try {
+ if (!configured) {
+ conf = confFactory.getConf();
+ inputSplits = inputSplitsFactory.getSplits();
+ nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ configured = true;
+ }
+ return new HDFSInputStreamProvider<Object>(read, inputSplits, readSchedule, nodeName, conf, configuration,
+ files);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ /**
+ * Get the cluster locations for this input stream factory. This method specifies on which asterix nodes the
+ * external
+ * adapter will run and how many threads per node.
+ * @return
+ */
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() {
+ clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
+ return clusterLocations;
+ }
+
+ /**
+ * This method initialize the scheduler which assigns responsibility of reading different logical input splits from
+ * HDFS
+ */
+ private static void initialize() {
+ synchronized (initialized) {
+ if (!initialized) {
+ hdfsScheduler = HDFSUtils.initializeHDFSScheduler();
+ indexingScheduler = HDFSUtils.initializeIndexingHDFSScheduler();
+ initialized = true;
+ }
+ }
+ }
+
+ public JobConf getJobConf() throws HyracksDataException {
+ return confFactory.getConf();
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return (ExternalDataUtils.isDataSourceStreamProvider(configuration)) ? DataSourceType.STREAM
+ : DataSourceType.RECORDS;
+ }
+
+ @Override
+ public IRecordReader<? extends Writable> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws Exception {
+ JobConf conf = confFactory.getConf();
+ InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+ String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ HDFSRecordReader<Object, Writable> recordReader = new HDFSRecordReader<Object, Writable>(read, inputSplits,
+ readSchedule, nodeName, conf);
+ if (files != null) {
+ recordReader.setSnapshot(files);
+ recordReader.setIndexer(ExternalIndexerProvider.getIndexer(configuration));
+ }
+ recordReader.configure(configuration);
+ return recordReader;
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return recordClass;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return true;
+ }
+
+ @Override
+ public boolean isIndexingOp() {
+ return (files != null && indexingOp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
new file mode 100644
index 0000000..fd5c397
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record;
+
+import java.util.Arrays;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.util.ExternalDataConstants;
+
+public class CharArrayRecord implements IRawRecord<char[]> {
+
+ private char[] value;
+ private int size;
+
+ @Override
+ public byte[] getBytes() {
+ return new String(value).getBytes();
+ }
+
+ @Override
+ public char[] get() {
+ return value;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ public CharArrayRecord(int initialCapacity) {
+ value = new char[initialCapacity];
+ size = 0;
+ }
+
+ public CharArrayRecord() {
+ value = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+ size = 0;
+ }
+
+ public void setValue(char[] recordBuffer, int offset, int length) {
+ if (value.length < length) {
+ value = new char[length];
+ }
+ System.arraycopy(recordBuffer, offset, value, 0, length);
+ size = length;
+ }
+
+ private void ensureCapacity(int len) {
+ if (value.length < len) {
+ value = Arrays.copyOf(value, (int) (len * 1.25));
+ }
+ }
+
+ public void append(char[] recordBuffer, int offset, int length) {
+ ensureCapacity(size + length);
+ System.arraycopy(recordBuffer, offset, value, size, length);
+ size += length;
+ }
+
+ @Override
+ public void reset() {
+ size = 0;
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(value, 0, size);
+ }
+
+ public void setValue(char[] value) {
+ this.value = value;
+ }
+
+ public void endRecord() {
+ if (value[size - 1] != ExternalDataConstants.LF) {
+ appendChar(ExternalDataConstants.LF);
+ }
+ }
+
+ private void appendChar(char c) {
+ ensureCapacity(size + 1);
+ value[size] = c;
+ size++;
+ }
+
+ @Override
+ public Class<char[]> getRecordClass() {
+ return char[].class;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
new file mode 100644
index 0000000..365bc22
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record;
+
+import org.apache.asterix.external.api.IRawRecord;
+
+public class GenericRecord<T> implements IRawRecord<T> {
+
+ private T record;
+
+ public GenericRecord() {
+ }
+
+ public GenericRecord(T record) {
+ this.record = record;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return null;
+ }
+
+ @Override
+ public T get() {
+ return record;
+ }
+
+ @Override
+ public int size() {
+ return -1;
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return record.getClass();
+ }
+
+ public void set(T record) {
+ this.record = record;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
new file mode 100644
index 0000000..1b84e7a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+public abstract class AbstractCharRecordLookupReader extends AbstractHDFSLookupRecordReader<char[]> {
+ public AbstractCharRecordLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs,
+ Configuration conf) {
+ super(snapshotAccessor, fs, conf);
+ }
+
+ protected CharArrayRecord record = new CharArrayRecord();
+ protected Text value = new Text();
+ protected CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+ protected ByteBuffer reusableByteBuffer = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ protected CharBuffer reusableCharBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+
+ @Override
+ public Class<?> getRecordClass() throws IOException {
+ return char[].class;
+ }
+
+ @Override
+ protected IRawRecord<char[]> lookup(RecordId rid) throws IOException {
+ record.reset();
+ readRecord(rid);
+ writeRecord();
+ return record;
+ }
+
+ protected abstract void readRecord(RecordId rid) throws IOException;
+
+ private void writeRecord() {
+ reusableByteBuffer.clear();
+ if (reusableByteBuffer.remaining() < value.getLength()) {
+ reusableByteBuffer = ByteBuffer
+ .allocateDirect(value.getLength() + ExternalDataConstants.DEFAULT_BUFFER_INCREMENT);
+ }
+ reusableByteBuffer.put(value.getBytes(), 0, value.getLength());
+ reusableByteBuffer.flip();
+ while (reusableByteBuffer.hasRemaining()) {
+ decoder.decode(reusableByteBuffer, reusableCharBuffer, false);
+ record.append(reusableCharBuffer.array(), 0, reusableCharBuffer.position());
+ reusableCharBuffer.clear();
+ }
+ record.endRecord();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
new file mode 100644
index 0000000..5a20962
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractHDFSLookupRecordReader<T> implements ILookupRecordReader<T> {
+
+ protected int fileId;
+ private ExternalFileIndexAccessor snapshotAccessor;
+ protected ExternalFile file;
+ protected FileSystem fs;
+ protected Configuration conf;
+ protected boolean replaced;
+
+ public AbstractHDFSLookupRecordReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs,
+ Configuration conf) {
+ this.snapshotAccessor = snapshotAccessor;
+ this.fs = fs;
+ this.conf = conf;
+ this.fileId = -1;
+ this.file = new ExternalFile();
+ }
+
+ @Override
+ public void configure(Map<String, String> configurations) throws Exception {
+ }
+
+ @Override
+ public IRawRecord<T> read(RecordId rid) throws Exception {
+ if (rid.getFileId() != fileId) {
+ // close current file
+ closeFile();
+ // lookup new file
+ snapshotAccessor.lookup(rid.getFileId(), file);
+ fileId = rid.getFileId();
+ try {
+ validate();
+ if (!replaced) {
+ openFile();
+ validate();
+ if (replaced) {
+ closeFile();
+ }
+ }
+ } catch (FileNotFoundException e) {
+ replaced = true;
+ }
+ }
+ if (replaced) {
+ return null;
+ }
+ return lookup(rid);
+ }
+
+ protected abstract IRawRecord<T> lookup(RecordId rid) throws IOException;
+
+ private void validate() throws IllegalArgumentException, IOException {
+ FileStatus fileStatus = fs.getFileStatus(new Path(file.getFileName()));
+ replaced = fileStatus.getModificationTime() != file.getLastModefiedTime().getTime();
+ }
+
+ protected abstract void closeFile();
+
+ protected abstract void openFile() throws IllegalArgumentException, IOException;
+
+ @Override
+ public final void open() throws HyracksDataException {
+ snapshotAccessor.open();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ closeFile();
+ } finally {
+ snapshotAccessor.close();
+ }
+ }
+
+ @Override
+ public void fail() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
new file mode 100644
index 0000000..3b59b98
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.AInputStreamReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+
+public abstract class AbstractStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
+ protected AInputStreamReader reader;
+ protected CharArrayRecord record;
+ protected char[] inputBuffer;
+ protected int bufferLength = 0;
+ protected int bufferPosn = 0;
+ protected IExternalIndexer indexer;
+
+ @Override
+ public IRawRecord<char[]> next() throws IOException {
+ return record;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ public void setInputStream(AInputStream inputStream) throws IOException {
+ this.reader = new AInputStreamReader(inputStream);
+ }
+
+ @Override
+ public Class<char[]> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ record = new CharArrayRecord();
+ inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+ }
+
+ @Override
+ public IExternalIndexer getIndexer() {
+ return indexer;
+ }
+
+ @Override
+ public void setIndexer(IExternalIndexer indexer) {
+ this.indexer = indexer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
new file mode 100644
index 0000000..c7acb1a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexibleExternalDataSource;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public abstract class AbstractStreamRecordReaderFactory<T>
+ implements IRecordReaderFactory<T>, IIndexibleExternalDataSource {
+
+ private static final long serialVersionUID = 1L;
+ protected IInputStreamProviderFactory inputStreamFactory;
+ protected Map<String, String> configuration;
+
+ public AbstractStreamRecordReaderFactory<T> setInputStreamFactoryProvider(
+ IInputStreamProviderFactory inputStreamFactory) {
+ this.inputStreamFactory = inputStreamFactory;
+ return this;
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return inputStreamFactory.getPartitionConstraint();
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ inputStreamFactory.configure(configuration);
+ configureStreamReaderFactory(configuration);
+ }
+
+ protected abstract void configureStreamReaderFactory(Map<String, String> configuration) throws Exception;
+
+ @Override
+ public boolean isIndexible() {
+ return inputStreamFactory.isIndexible();
+ }
+
+ @Override
+ public void setSnapshot(List<ExternalFile> files, boolean indexingOp) throws Exception {
+ ((IIndexibleExternalDataSource) inputStreamFactory).setSnapshot(files, indexingOp);
+ }
+
+ @Override
+ public boolean isIndexingOp() {
+ if (inputStreamFactory.isIndexible()) {
+ return ((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp();
+ }
+ return false;
+ }
+
+ protected IRecordReader<char[]> configureReader(AbstractStreamRecordReader recordReader, IHyracksTaskContext ctx,
+ int partition) throws Exception {
+ IInputStreamProvider inputStreamProvider = inputStreamFactory.createInputStreamProvider(ctx, partition);
+ IExternalIndexer indexer = null;
+ if (inputStreamFactory.isIndexible()) {
+ if (((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp()) {
+ indexer = ((IIndexingDatasource) inputStreamProvider).getIndexer();
+ }
+ }
+ recordReader.setInputStream(inputStreamProvider.getInputStream());
+ recordReader.setIndexer(indexer);
+ recordReader.configure(configuration);
+ return recordReader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java
new file mode 100644
index 0000000..e742b1e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.RecordReader;
+
+public class EmptyRecordReader<K, V> implements RecordReader<K, V> {
+
+ @Override
+ public boolean next(K key, V value) throws IOException {
+ return false;
+ }
+
+ @Override
+ public K createKey() {
+ return null;
+ }
+
+ @Override
+ public V createValue() {
+ return null;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
new file mode 100644
index 0000000..d88f967
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HDFSRecordReader<K, V extends Writable> implements IRecordReader<Writable>, IIndexingDatasource {
+
+ protected RecordReader<K, Writable> reader;
+ protected V value = null;
+ protected K key = null;
+ protected int currentSplitIndex = 0;
+ protected boolean read[];
+ protected InputFormat<?, ?> inputFormat;
+ protected InputSplit[] inputSplits;
+ protected String[] readSchedule;
+ protected String nodeName;
+ protected JobConf conf;
+ protected GenericRecord<Writable> record;
+ // Indexing variables
+ protected IExternalIndexer indexer;
+ protected List<ExternalFile> snapshot;
+ protected FileSystem hdfs;
+
+ public HDFSRecordReader(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
+ JobConf conf) {
+ this.read = read;
+ this.inputSplits = inputSplits;
+ this.readSchedule = readSchedule;
+ this.nodeName = nodeName;
+ this.conf = conf;
+ this.inputFormat = conf.getInputFormat();
+ this.reader = new EmptyRecordReader<K, Writable>();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ record = new GenericRecord<Writable>();
+ nextInputSplit();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ if (reader.next(key, value)) {
+ return true;
+ }
+ while (nextInputSplit()) {
+ if (reader.next(key, value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public IRawRecord<Writable> next() throws IOException {
+ record.set(value);
+ return record;
+ }
+
+ @Override
+ public Class<? extends Writable> getRecordClass() throws IOException {
+ if (value == null) {
+ if (!nextInputSplit()) {
+ return null;
+ }
+ }
+ return value.getClass();
+ }
+
+ private boolean nextInputSplit() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read synchronize among
+ * simultaneous partitions in the same machine
+ */
+ synchronized (read) {
+ if (read[currentSplitIndex] == false) {
+ read[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
+ }
+ if (snapshot != null) {
+ String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
+ FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName));
+ // Skip if not the same file stored in the files snapshot
+ if (fileStatus.getModificationTime() != snapshot.get(currentSplitIndex).getLastModefiedTime()
+ .getTime())
+ continue;
+ }
+
+ reader.close();
+ reader = getRecordReader(currentSplitIndex);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ private RecordReader<K, Writable> getRecordReader(int splitIndex) throws IOException {
+ reader = (RecordReader<K, Writable>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+ if (key == null) {
+ key = reader.createKey();
+ value = (V) reader.createValue();
+ }
+ if (indexer != null) {
+ try {
+ indexer.reset(this);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ return reader;
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ @Override
+ public IExternalIndexer getIndexer() {
+ return indexer;
+ }
+
+ @Override
+ public void setIndexer(IExternalIndexer indexer) {
+ this.indexer = indexer;
+ }
+
+ public List<ExternalFile> getSnapshot() {
+ return snapshot;
+ }
+
+ public void setSnapshot(List<ExternalFile> snapshot) throws IOException {
+ this.snapshot = snapshot;
+ hdfs = FileSystem.get(conf);
+ }
+
+ public int getCurrentSplitIndex() {
+ return currentSplitIndex;
+ }
+
+ public RecordReader<K, Writable> getReader() {
+ return reader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
new file mode 100644
index 0000000..9466a96
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+public class HDFSTextLineReader {
+ private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
+ private FSDataInputStream reader;
+
+ private byte[] buffer;
+ // the number of bytes of real data in the buffer
+ private int bufferLength = 0;
+ // the current position in the buffer
+ private int bufferPosn = 0;
+
+ private long currentFilePos = 0L;
+
+ private static final byte CR = '\r';
+ private static final byte LF = '\n';
+
+ public static final String KEY_BUFFER_SIZE = "io.file.buffer.size";
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * default buffer-size (32k).
+ *
+ * @param in
+ * The input stream
+ * @throws IOException
+ */
+ public HDFSTextLineReader(FSDataInputStream in) throws IOException {
+ this(in, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * given buffer-size.
+ *
+ * @param in
+ * The input stream
+ * @param bufferSize
+ * Size of the read buffer
+ * @throws IOException
+ */
+ public HDFSTextLineReader(FSDataInputStream in, int bufferSize) throws IOException {
+ this.reader = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ currentFilePos = in.getPos();
+ }
+
+ public HDFSTextLineReader() throws IOException {
+ this.bufferSize = DEFAULT_BUFFER_SIZE;
+ this.buffer = new byte[this.bufferSize];
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the <code>io.file.buffer.size</code> specified in the given <code>Configuration</code>.
+ *
+ * @param in
+ * input stream
+ * @param conf
+ * configuration
+ * @throws IOException
+ */
+ public HDFSTextLineReader(FSDataInputStream in, Configuration conf) throws IOException {
+ this(in, conf.getInt(KEY_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
+ }
+
+ /**
+ * Read one line from the InputStream into the given Text. A line
+ * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
+ * line.
+ *
+ * @param str
+ * the object to store the given line (without newline)
+ * @param maxLineLength
+ * the maximum number of bytes to store into str;
+ * the rest of the line is silently discarded.
+ * @param maxBytesToConsume
+ * the maximum number of bytes to consume
+ * in this call. This is only a hint, because if the line cross
+ * this threshold, we allow it to happen. It can overshoot
+ * potentially by as much as one buffer length.
+ * @return the number of bytes read including the (longest) newline
+ * found.
+ * @throws IOException
+ * if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
+ /* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to str.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to str, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ */
+ str.clear();
+ int txtLength = 0; //tracks str.getLength(), as an optimization
+ int newlineLength = 0; //length of terminating newline
+ boolean prevCharCR = false; //true of prev char was CR
+ long bytesConsumed = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ if (prevCharCR)
+ ++bytesConsumed; //account for CR from previous read
+ bufferLength = reader.read(buffer);
+ if (bufferLength <= 0)
+ break; // EOF
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (buffer[bufferPosn] == LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (buffer[bufferPosn] == CR);
+ }
+ int readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0)
+ --readLength; //CR at the end of the buffer
+ bytesConsumed += readLength;
+ int appendLength = readLength - newlineLength;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+ if (appendLength > 0) {
+ str.append(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+ } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+ if (bytesConsumed > Integer.MAX_VALUE)
+ throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ currentFilePos = reader.getPos() - bufferLength + bufferPosn;
+ return (int) bytesConsumed;
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ *
+ * @param str
+ * the object to store the given line
+ * @param maxLineLength
+ * the maximum number of bytes to store into str.
+ * @return the number of bytes read including the newline
+ * @throws IOException
+ * if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength) throws IOException {
+ return readLine(str, maxLineLength, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ *
+ * @param str
+ * the object to store the given line
+ * @return the number of bytes read including the newline
+ * @throws IOException
+ * if the underlying stream throws
+ */
+ public int readLine(Text str) throws IOException {
+ return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ }
+
+ public void seek(long desired) throws IOException {
+ if (reader.getPos() <= desired || currentFilePos > desired) {
+ // desired position is ahead of stream or before the current position, seek to position
+ reader.seek(desired);
+ bufferLength = 0;
+ bufferPosn = 0;
+ currentFilePos = desired;
+ } else if (currentFilePos < desired) {
+ // desired position is in the buffer
+ int difference = (int) (desired - currentFilePos);
+ bufferPosn += difference;
+ currentFilePos = desired;
+ }
+ }
+
+ public FSDataInputStream getReader() {
+ return reader;
+ }
+
+ public void resetReader(FSDataInputStream reader) throws IOException {
+ this.reader = reader;
+ bufferLength = 0;
+ bufferPosn = 0;
+ currentFilePos = reader.getPos();
+ }
+
+ public void close() throws IOException {
+ reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
new file mode 100644
index 0000000..9b11df6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+
+public class LineRecordReader extends AbstractStreamRecordReader {
+
+ protected boolean prevCharCR;
+ protected int newlineLength;
+ protected int recordNumber = 0;
+
+ @Override
+ public boolean hasNext() throws IOException {
+ /* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to record.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to record, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ */
+ newlineLength = 0; //length of terminating newline
+ prevCharCR = false; //true of prev char was CR
+ record.reset();
+ int readLength = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = reader.read(inputBuffer);
+ if (bufferLength <= 0) {
+ if (readLength > 0) {
+ record.endRecord();
+ recordNumber++;
+ return true;
+ }
+ reader.close();
+ return false; //EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+ }
+ readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength; //CR at the end of the buffer
+ }
+ if (readLength > 0) {
+ record.append(inputBuffer, startPosn, readLength);
+ }
+ } while (newlineLength == 0);
+ recordNumber++;
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ super.configure(configuration);
+ if (ExternalDataUtils.hasHeader(configuration)) {
+ if (hasNext()) {
+ next();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
new file mode 100644
index 0000000..3a82a68
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.ILookupReaderFactory;
+import org.apache.asterix.external.input.record.reader.factory.HDFSLookupReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+
+public class LookupReaderFactoryProvider {
+
+ @SuppressWarnings("rawtypes")
+ public static ILookupReaderFactory getLookupReaderFactory(Map<String, String> configuration) throws Exception {
+ String inputFormat = HDFSUtils.getInputFormatClassName(configuration);
+ if (inputFormat.equals(ExternalDataConstants.CLASS_NAME_TEXT_INPUT_FORMAT)
+ || inputFormat.equals(ExternalDataConstants.CLASS_NAME_SEQUENCE_INPUT_FORMAT)
+ || inputFormat.equals(ExternalDataConstants.CLASS_NAME_RC_INPUT_FORMAT)) {
+ HDFSLookupReaderFactory<Object> readerFactory = new HDFSLookupReaderFactory<Object>();
+ readerFactory.configure(configuration);
+ return readerFactory;
+ } else {
+ throw new AsterixException("Unrecognized external format");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
new file mode 100644
index 0000000..668876e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+
+public class QuotedLineRecordReader extends LineRecordReader {
+
+ private char quote;
+ private boolean prevCharEscape;
+ private boolean inQuote;
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ super.configure(configuration);
+ String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
+ if (quoteString == null || quoteString.length() != 1) {
+ throw new AsterixException(ExternalDataExceptionUtils.incorrectParameterMessage(
+ ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
+ }
+ this.quote = quoteString.charAt(0);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ newlineLength = 0;
+ prevCharCR = false;
+ prevCharEscape = false;
+ record.reset();
+ int readLength = 0;
+ inQuote = false;
+ do {
+ int startPosn = bufferPosn;
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = reader.read(inputBuffer);
+ if (bufferLength <= 0) {
+ {
+ if (readLength > 0) {
+ if (inQuote) {
+ throw new IOException("malformed input record ended inside quote");
+ }
+ record.endRecord();
+ recordNumber++;
+ return true;
+ }
+ return false;
+ }
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) {
+ if (!inQuote) {
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn;
+ break;
+ }
+ if (prevCharCR) {
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+ if (inputBuffer[bufferPosn] == quote) {
+ if (!prevCharEscape) {
+ inQuote = true;
+ }
+ }
+ if (prevCharEscape) {
+ prevCharEscape = false;
+ } else {
+ prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ }
+ } else {
+ // only look for next quote
+ if (inputBuffer[bufferPosn] == quote) {
+ if (!prevCharEscape) {
+ inQuote = false;
+ }
+ }
+ prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ }
+ }
+ readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength;
+ }
+ if (readLength > 0) {
+ record.append(inputBuffer, startPosn, readLength);
+ }
+ } while (newlineLength == 0);
+ recordNumber++;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
new file mode 100644
index 0000000..5c33502
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+public class RCLookupReader extends AbstractHDFSLookupRecordReader<BytesRefArrayWritable> {
+ public RCLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+ super(snapshotAccessor, fs, conf);
+ }
+
+ private static final Logger LOGGER = Logger.getLogger(RCLookupReader.class.getName());
+ private Reader reader;
+ private LongWritable key = new LongWritable();
+ private BytesRefArrayWritable value = new BytesRefArrayWritable();
+ private GenericRecord<BytesRefArrayWritable> record = new GenericRecord<BytesRefArrayWritable>();
+ private long offset;
+ private int row;
+
+ @Override
+ public Class<?> getRecordClass() throws IOException {
+ return Writable.class;
+ }
+
+ @Override
+ protected IRawRecord<BytesRefArrayWritable> lookup(RecordId rid) throws IOException {
+ if (rid.getOffset() != offset) {
+ offset = rid.getOffset();
+ if (reader.getPosition() != offset)
+ reader.seek(offset);
+ reader.resetBuffer();
+ row = -1;
+ }
+
+ // skip rows to the record row
+ while (row < rid.getRow()) {
+ reader.next(key);
+ reader.getCurrentRow(value);
+ row++;
+ }
+ record.set(value);
+ return record;
+ }
+
+ @Override
+ protected void closeFile() {
+ if (reader == null) {
+ return;
+ }
+ try {
+ reader.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing HDFS file", e);
+ }
+ }
+
+ @Override
+ protected void openFile() throws IllegalArgumentException, IOException {
+ reader = new Reader(fs, new Path(file.getFileName()), conf);
+ offset = -1;
+ row = -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
new file mode 100644
index 0000000..1c2dc30
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.log4j.Logger;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+import com.sun.syndication.feed.synd.SyndFeed;
+import com.sun.syndication.fetcher.FeedFetcher;
+import com.sun.syndication.fetcher.FetcherEvent;
+import com.sun.syndication.fetcher.FetcherException;
+import com.sun.syndication.fetcher.FetcherListener;
+import com.sun.syndication.fetcher.impl.FeedFetcherCache;
+import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
+import com.sun.syndication.fetcher.impl.HttpURLFeedFetcher;
+import com.sun.syndication.io.FeedException;
+
+public class RSSRecordReader implements IRecordReader<SyndEntryImpl> {
+
+ private static final Logger LOGGER = Logger.getLogger(RSSRecordReader.class.getName());
+ private boolean modified = false;
+ private Queue<SyndEntryImpl> rssFeedBuffer = new LinkedList<SyndEntryImpl>();
+ private FeedFetcherCache feedInfoCache;
+ private FeedFetcher fetcher;
+ private FetcherEventListenerImpl listener;
+ private URL feedUrl;
+ private GenericRecord<SyndEntryImpl> record = new GenericRecord<SyndEntryImpl>();
+ private boolean done = false;
+
+ public RSSRecordReader(String url) throws MalformedURLException {
+ feedUrl = new URL(url);
+ feedInfoCache = HashMapFeedInfoCache.getInstance();
+ fetcher = new HttpURLFeedFetcher(feedInfoCache);
+ listener = new FetcherEventListenerImpl(this, LOGGER);
+ fetcher.addFetcherEventListener(listener);
+ }
+
+ public boolean isModified() {
+ return modified;
+ }
+
+ @Override
+ public void close() throws IOException {
+ fetcher.removeFetcherEventListener(listener);
+ }
+
+ @Override
+ public void configure(Map<String, String> configurations) throws Exception {
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return !done;
+ }
+
+ @Override
+ public IRawRecord<SyndEntryImpl> next() throws IOException {
+ if (done) {
+ return null;
+ }
+ try {
+ SyndEntryImpl feedEntry;
+ feedEntry = getNextRSSFeed();
+ if (feedEntry == null) {
+ return null;
+ }
+ record.set(feedEntry);
+ return record;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Class<SyndEntryImpl> getRecordClass() throws IOException {
+ return SyndEntryImpl.class;
+ }
+
+ @Override
+ public boolean stop() {
+ done = true;
+ return true;
+ }
+
+ public void setModified(boolean modified) {
+ this.modified = modified;
+ }
+
+ private SyndEntryImpl getNextRSSFeed() throws Exception {
+ if (rssFeedBuffer.isEmpty()) {
+ fetchFeed();
+ }
+ if (rssFeedBuffer.isEmpty()) {
+ return null;
+ } else {
+ return rssFeedBuffer.remove();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void fetchFeed() throws IllegalArgumentException, IOException, FeedException, FetcherException {
+ // Retrieve the feed.
+ // We will get a Feed Polled Event and then a
+ // Feed Retrieved event (assuming the feed is valid)
+ SyndFeed feed = fetcher.retrieveFeed(feedUrl);
+ if (modified) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(feedUrl + " retrieved");
+ LOGGER.info(feedUrl + " has a title: " + feed.getTitle() + " and contains " + feed.getEntries().size()
+ + " entries.");
+ }
+ List<? extends SyndEntryImpl> fetchedFeeds = feed.getEntries();
+ rssFeedBuffer.addAll(fetchedFeeds);
+ }
+ }
+}
+
+class FetcherEventListenerImpl implements FetcherListener {
+
+ private RSSRecordReader reader;
+ private Logger LOGGER;
+
+ public FetcherEventListenerImpl(RSSRecordReader reader, Logger LOGGER) {
+ this.reader = reader;
+ this.LOGGER = LOGGER;
+ }
+
+ /**
+ * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
+ */
+ @Override
+ public void fetcherEvent(FetcherEvent event) {
+ String eventType = event.getEventType();
+ if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("\tEVENT: Feed Polled. URL = " + event.getUrlString());
+ }
+ } else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("\tEVENT: Feed Retrieved. URL = " + event.getUrlString());
+ }
+ (reader).setModified(true);
+ } else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("\tEVENT: Feed Unchanged. URL = " + event.getUrlString());
+ }
+ (reader).setModified(true);
+ }
+ }
+}