You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/30 00:52:16 UTC
[3/5] incubator-asterixdb git commit: Add flush() to IFrameWriter
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
new file mode 100644
index 0000000..28abddb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractHDFSLookupRecordReader<T> implements ILookupRecordReader<T> {
+
+ protected int fileId;
+ private ExternalFileIndexAccessor snapshotAccessor;
+ protected ExternalFile file;
+ protected FileSystem fs;
+ protected Configuration conf;
+ protected boolean replaced;
+
+ public AbstractHDFSLookupRecordReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs,
+ Configuration conf) {
+ this.snapshotAccessor = snapshotAccessor;
+ this.fs = fs;
+ this.conf = conf;
+ this.fileId = -1;
+ this.file = new ExternalFile();
+ }
+
+ @Override
+ public void configure(Map<String, String> configurations) throws Exception {
+ }
+
+ @Override
+ public IRawRecord<T> read(RecordId rid) throws Exception {
+ if (rid.getFileId() != fileId) {
+ // close current file
+ closeFile();
+ // lookup new file
+ snapshotAccessor.lookup(rid.getFileId(), file);
+ fileId = rid.getFileId();
+ try {
+ validate();
+ if (!replaced) {
+ openFile();
+ validate();
+ if (replaced) {
+ closeFile();
+ }
+ }
+ } catch (FileNotFoundException e) {
+ replaced = true;
+ }
+ }
+ if (replaced) {
+ return null;
+ }
+ return lookup(rid);
+ }
+
+ protected abstract IRawRecord<T> lookup(RecordId rid) throws IOException;
+
+ private void validate() throws IllegalArgumentException, IOException {
+ FileStatus fileStatus = fs.getFileStatus(new Path(file.getFileName()));
+ replaced = fileStatus.getModificationTime() != file.getLastModefiedTime().getTime();
+ }
+
+ protected abstract void closeFile();
+
+ protected abstract void openFile() throws IllegalArgumentException, IOException;
+
+ @Override
+ public final void open() throws HyracksDataException {
+ snapshotAccessor.open();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ closeFile();
+ } finally {
+ snapshotAccessor.close();
+ }
+ }
+
+ @Override
+ public void fail() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
new file mode 100644
index 0000000..c302b9b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.ILookupReaderFactory;
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+
+public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
+
+ protected static final long serialVersionUID = 1L;
+ protected transient AlgebricksPartitionConstraint clusterLocations;
+ protected ConfFactory confFactory;
+ protected Map<String, String> configuration;
+
+ public HDFSLookupReaderFactory() {
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
+ return clusterLocations;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
+ confFactory = new ConfFactory(conf);
+
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ILookupRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition,
+ ExternalFileIndexAccessor snapshotAccessor) throws Exception {
+ String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+ JobConf conf = confFactory.getConf();
+ FileSystem fs = FileSystem.get(conf);
+ switch (inputFormatParameter) {
+ case ExternalDataConstants.INPUT_FORMAT_TEXT:
+ return (ILookupRecordReader<? extends T>) new TextLookupReader(snapshotAccessor, fs, conf);
+ case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
+ return (ILookupRecordReader<? extends T>) new SequenceLookupReader(snapshotAccessor, fs, conf);
+ case ExternalDataConstants.INPUT_FORMAT_RC:
+ return (ILookupRecordReader<? extends T>) new RCLookupReader(snapshotAccessor, fs, conf);
+ default:
+ throw new AsterixException("Unrecognised input format: " + inputFormatParameter);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
new file mode 100644
index 0000000..564d55a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.input.record.reader.EmptyRecordReader;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HDFSRecordReader<K, V extends Writable> implements IRecordReader<Writable>, IIndexingDatasource {
+
+ protected RecordReader<K, Writable> reader;
+ protected V value = null;
+ protected K key = null;
+ protected int currentSplitIndex = 0;
+ protected boolean read[];
+ protected InputFormat<?, ?> inputFormat;
+ protected InputSplit[] inputSplits;
+ protected String[] readSchedule;
+ protected String nodeName;
+ protected JobConf conf;
+ protected GenericRecord<Writable> record;
+ // Indexing variables
+ protected IExternalIndexer indexer;
+ protected List<ExternalFile> snapshot;
+ protected FileSystem hdfs;
+
+ public HDFSRecordReader(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
+ JobConf conf) {
+ this.read = read;
+ this.inputSplits = inputSplits;
+ this.readSchedule = readSchedule;
+ this.nodeName = nodeName;
+ this.conf = conf;
+ this.inputFormat = conf.getInputFormat();
+ this.reader = new EmptyRecordReader<K, Writable>();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ record = new GenericRecord<Writable>();
+ nextInputSplit();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ if (reader.next(key, value)) {
+ return true;
+ }
+ while (nextInputSplit()) {
+ if (reader.next(key, value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public IRawRecord<Writable> next() throws IOException {
+ record.set(value);
+ return record;
+ }
+
+ private boolean nextInputSplit() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read synchronize among
+ * simultaneous partitions in the same machine
+ */
+ synchronized (read) {
+ if (read[currentSplitIndex] == false) {
+ read[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
+ }
+ if (snapshot != null) {
+ String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
+ FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName));
+ // Skip if not the same file stored in the files snapshot
+ if (fileStatus.getModificationTime() != snapshot.get(currentSplitIndex).getLastModefiedTime()
+ .getTime())
+ continue;
+ }
+
+ reader.close();
+ reader = getRecordReader(currentSplitIndex);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ private RecordReader<K, Writable> getRecordReader(int splitIndex) throws IOException {
+ reader = (RecordReader<K, Writable>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+ if (key == null) {
+ key = reader.createKey();
+ value = (V) reader.createValue();
+ }
+ if (indexer != null) {
+ try {
+ indexer.reset(this);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ return reader;
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ @Override
+ public IExternalIndexer getIndexer() {
+ return indexer;
+ }
+
+ @Override
+ public void setIndexer(IExternalIndexer indexer) {
+ this.indexer = indexer;
+ }
+
+ public List<ExternalFile> getSnapshot() {
+ return snapshot;
+ }
+
+ public void setSnapshot(List<ExternalFile> snapshot) throws IOException {
+ this.snapshot = snapshot;
+ hdfs = FileSystem.get(conf);
+ }
+
+ public int getCurrentSplitIndex() {
+ return currentSplitIndex;
+ }
+
+ public RecordReader<K, Writable> getReader() {
+ return reader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
new file mode 100644
index 0000000..90f1558
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+public class HDFSTextLineReader {
+ private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
+ private FSDataInputStream reader;
+
+ private byte[] buffer;
+ // the number of bytes of real data in the buffer
+ private int bufferLength = 0;
+ // the current position in the buffer
+ private int bufferPosn = 0;
+
+ private long currentFilePos = 0L;
+
+ private static final byte CR = '\r';
+ private static final byte LF = '\n';
+
+ public static final String KEY_BUFFER_SIZE = "io.file.buffer.size";
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * default buffer-size (32k).
+ *
+ * @param in
+ * The input stream
+ * @throws IOException
+ */
+ public HDFSTextLineReader(FSDataInputStream in) throws IOException {
+ this(in, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * given buffer-size.
+ *
+ * @param in
+ * The input stream
+ * @param bufferSize
+ * Size of the read buffer
+ * @throws IOException
+ */
+ public HDFSTextLineReader(FSDataInputStream in, int bufferSize) throws IOException {
+ this.reader = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ currentFilePos = in.getPos();
+ }
+
+ public HDFSTextLineReader() throws IOException {
+ this.bufferSize = DEFAULT_BUFFER_SIZE;
+ this.buffer = new byte[this.bufferSize];
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the <code>io.file.buffer.size</code> specified in the given <code>Configuration</code>.
+ *
+ * @param in
+ * input stream
+ * @param conf
+ * configuration
+ * @throws IOException
+ */
+ public HDFSTextLineReader(FSDataInputStream in, Configuration conf) throws IOException {
+ this(in, conf.getInt(KEY_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
+ }
+
+ /**
+ * Read one line from the InputStream into the given Text. A line
+ * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
+ * line.
+ *
+ * @param str
+ * the object to store the given line (without newline)
+ * @param maxLineLength
+ * the maximum number of bytes to store into str;
+ * the rest of the line is silently discarded.
+ * @param maxBytesToConsume
+ * the maximum number of bytes to consume
+ * in this call. This is only a hint, because if the line cross
+ * this threshold, we allow it to happen. It can overshoot
+ * potentially by as much as one buffer length.
+ * @return the number of bytes read including the (longest) newline
+ * found.
+ * @throws IOException
+ * if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
+ /* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to str.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to str, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ */
+ str.clear();
+ int txtLength = 0; //tracks str.getLength(), as an optimization
+ int newlineLength = 0; //length of terminating newline
+ boolean prevCharCR = false; //true of prev char was CR
+ long bytesConsumed = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ if (prevCharCR)
+ ++bytesConsumed; //account for CR from previous read
+ bufferLength = reader.read(buffer);
+ if (bufferLength <= 0)
+ break; // EOF
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (buffer[bufferPosn] == LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (buffer[bufferPosn] == CR);
+ }
+ int readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0)
+ --readLength; //CR at the end of the buffer
+ bytesConsumed += readLength;
+ int appendLength = readLength - newlineLength;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+ if (appendLength > 0) {
+ str.append(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+ } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+ if (bytesConsumed > Integer.MAX_VALUE)
+ throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ currentFilePos = reader.getPos() - bufferLength + bufferPosn;
+ return (int) bytesConsumed;
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ *
+ * @param str
+ * the object to store the given line
+ * @param maxLineLength
+ * the maximum number of bytes to store into str.
+ * @return the number of bytes read including the newline
+ * @throws IOException
+ * if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength) throws IOException {
+ return readLine(str, maxLineLength, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ *
+ * @param str
+ * the object to store the given line
+ * @return the number of bytes read including the newline
+ * @throws IOException
+ * if the underlying stream throws
+ */
+ public int readLine(Text str) throws IOException {
+ return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ }
+
+ public void seek(long desired) throws IOException {
+ if (reader.getPos() <= desired || currentFilePos > desired) {
+ // desired position is ahead of stream or before the current position, seek to position
+ reader.seek(desired);
+ bufferLength = 0;
+ bufferPosn = 0;
+ currentFilePos = desired;
+ } else if (currentFilePos < desired) {
+ // desired position is in the buffer
+ int difference = (int) (desired - currentFilePos);
+ bufferPosn += difference;
+ currentFilePos = desired;
+ }
+ }
+
+ public FSDataInputStream getReader() {
+ return reader;
+ }
+
+ public void resetReader(FSDataInputStream reader) throws IOException {
+ this.reader = reader;
+ bufferLength = 0;
+ bufferPosn = 0;
+ currentFilePos = reader.getPos();
+ }
+
+ public void close() throws IOException {
+ reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
new file mode 100644
index 0000000..95d76ba
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+public class RCLookupReader extends AbstractHDFSLookupRecordReader<BytesRefArrayWritable> {
+ public RCLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+ super(snapshotAccessor, fs, conf);
+ }
+
+ private static final Logger LOGGER = Logger.getLogger(RCLookupReader.class.getName());
+ private Reader reader;
+ private LongWritable key = new LongWritable();
+ private BytesRefArrayWritable value = new BytesRefArrayWritable();
+ private GenericRecord<BytesRefArrayWritable> record = new GenericRecord<BytesRefArrayWritable>();
+ private long offset;
+ private int row;
+
+ @Override
+ public Class<?> getRecordClass() throws IOException {
+ return Writable.class;
+ }
+
+ @Override
+ protected IRawRecord<BytesRefArrayWritable> lookup(RecordId rid) throws IOException {
+ if (rid.getOffset() != offset) {
+ offset = rid.getOffset();
+ if (reader.getPosition() != offset)
+ reader.seek(offset);
+ reader.resetBuffer();
+ row = -1;
+ }
+
+ // skip rows to the record row
+ while (row < rid.getRow()) {
+ reader.next(key);
+ reader.getCurrentRow(value);
+ row++;
+ }
+ record.set(value);
+ return record;
+ }
+
+ @Override
+ protected void closeFile() {
+ if (reader == null) {
+ return;
+ }
+ try {
+ reader.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing HDFS file", e);
+ }
+ }
+
+ @Override
+ protected void openFile() throws IllegalArgumentException, IOException {
+ reader = new Reader(fs, new Path(file.getFileName()), conf);
+ offset = -1;
+ row = -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
new file mode 100644
index 0000000..23e647f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+
+public class SequenceLookupReader extends AbstractCharRecordLookupReader {
+
+ public SequenceLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+ super(snapshotAccessor, fs, conf);
+ }
+
+ private static final Logger LOGGER = Logger.getLogger(SequenceLookupReader.class.getName());
+ private Reader reader;
+ private Writable key;
+
+ @Override
+ protected void readRecord(RecordId rid) throws IOException {
+ reader.seek(rid.getOffset());
+ reader.next(key, value);
+ }
+
+ @Override
+ protected void closeFile() {
+ if (reader == null) {
+ return;
+ }
+ try {
+ reader.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing HDFS file ", e);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected void openFile() throws IllegalArgumentException, IOException {
+ reader = new SequenceFile.Reader(fs, new Path(file.getFileName()), conf);
+ key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
new file mode 100644
index 0000000..2e1a11a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class TextLookupReader extends AbstractCharRecordLookupReader {
+
+ public TextLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+ super(snapshotAccessor, fs, conf);
+ }
+
+ private static final Logger LOGGER = Logger.getLogger(TextLookupReader.class.getName());
+ private HDFSTextLineReader reader;
+
+ @Override
+ protected void readRecord(RecordId rid) throws IOException {
+ reader.seek(rid.getOffset());
+ reader.readLine(value);
+ }
+
+ @Override
+ protected void closeFile() {
+ if (reader == null) {
+ return;
+ }
+ try {
+ reader.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing HDFS file ", e);
+ }
+ }
+
+ @Override
+ protected void openFile() throws IllegalArgumentException, IOException {
+ if (reader == null) {
+ reader = new HDFSTextLineReader();
+ }
+ reader.resetReader(fs.open(new Path(file.getFileName())));;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
new file mode 100644
index 0000000..13cd26a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.rss;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.log4j.Logger;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+import com.sun.syndication.feed.synd.SyndFeed;
+import com.sun.syndication.fetcher.FeedFetcher;
+import com.sun.syndication.fetcher.FetcherEvent;
+import com.sun.syndication.fetcher.FetcherException;
+import com.sun.syndication.fetcher.FetcherListener;
+import com.sun.syndication.fetcher.impl.FeedFetcherCache;
+import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
+import com.sun.syndication.fetcher.impl.HttpURLFeedFetcher;
+import com.sun.syndication.io.FeedException;
+
+public class RSSRecordReader implements IRecordReader<SyndEntryImpl> {
+
+ private static final Logger LOGGER = Logger.getLogger(RSSRecordReader.class.getName());
+ private boolean modified = false;
+ private Queue<SyndEntryImpl> rssFeedBuffer = new LinkedList<SyndEntryImpl>();
+ private FeedFetcherCache feedInfoCache;
+ private FeedFetcher fetcher;
+ private FetcherEventListenerImpl listener;
+ private URL feedUrl;
+ private GenericRecord<SyndEntryImpl> record = new GenericRecord<SyndEntryImpl>();
+ private boolean done = false;
+
+ public RSSRecordReader(String url) throws MalformedURLException {
+ feedUrl = new URL(url);
+ feedInfoCache = HashMapFeedInfoCache.getInstance();
+ fetcher = new HttpURLFeedFetcher(feedInfoCache);
+ listener = new FetcherEventListenerImpl(this, LOGGER);
+ fetcher.addFetcherEventListener(listener);
+ }
+
+ public boolean isModified() {
+ return modified;
+ }
+
+ @Override
+ public void close() throws IOException {
+ fetcher.removeFetcherEventListener(listener);
+ }
+
+ @Override
+ public void configure(Map<String, String> configurations) throws Exception {
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return !done;
+ }
+
+ @Override
+ public IRawRecord<SyndEntryImpl> next() throws IOException {
+ if (done) {
+ return null;
+ }
+ try {
+ SyndEntryImpl feedEntry;
+ feedEntry = getNextRSSFeed();
+ if (feedEntry == null) {
+ return null;
+ }
+ record.set(feedEntry);
+ return record;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean stop() {
+ done = true;
+ return true;
+ }
+
+ public void setModified(boolean modified) {
+ this.modified = modified;
+ }
+
+ private SyndEntryImpl getNextRSSFeed() throws Exception {
+ if (rssFeedBuffer.isEmpty()) {
+ fetchFeed();
+ }
+ if (rssFeedBuffer.isEmpty()) {
+ return null;
+ } else {
+ return rssFeedBuffer.remove();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void fetchFeed() throws IllegalArgumentException, IOException, FeedException, FetcherException {
+ // Retrieve the feed.
+ // We will get a Feed Polled Event and then a
+ // Feed Retrieved event (assuming the feed is valid)
+ SyndFeed feed = fetcher.retrieveFeed(feedUrl);
+ if (modified) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(feedUrl + " retrieved");
+ LOGGER.info(feedUrl + " has a title: " + feed.getTitle() + " and contains " + feed.getEntries().size()
+ + " entries.");
+ }
+ List<? extends SyndEntryImpl> fetchedFeeds = feed.getEntries();
+ rssFeedBuffer.addAll(fetchedFeeds);
+ }
+ }
+}
+
+class FetcherEventListenerImpl implements FetcherListener {
+
+ private RSSRecordReader reader;
+ private Logger LOGGER;
+
+ public FetcherEventListenerImpl(RSSRecordReader reader, Logger LOGGER) {
+ this.reader = reader;
+ this.LOGGER = LOGGER;
+ }
+
+ /**
+ * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
+ */
+ @Override
+ public void fetcherEvent(FetcherEvent event) {
+ String eventType = event.getEventType();
+ if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("\tEVENT: Feed Polled. URL = " + event.getUrlString());
+ }
+ } else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("\tEVENT: Feed Retrieved. URL = " + event.getUrlString());
+ }
+ (reader).setModified(true);
+ } else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("\tEVENT: Feed Unchanged. URL = " + event.getUrlString());
+ }
+ (reader).setModified(true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
new file mode 100644
index 0000000..bbe485c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.rss;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+
+public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntryImpl> {
+
+ private static final long serialVersionUID = 1L;
+ private Map<String, String> configuration;
+ private List<String> urls = new ArrayList<String>();
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return new AlgebricksCountPartitionConstraint(urls.size());
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ String url = configuration.get(ExternalDataConstants.KEY_RSS_URL);
+ if (url == null) {
+ throw new IllegalArgumentException("no RSS URL provided");
+ }
+ initializeURLs(url);
+ }
+
+ private void initializeURLs(String url) {
+ urls.clear();
+ String[] rssURLs = url.split(",");
+ for (String rssURL : rssURLs) {
+ urls.add(rssURL);
+ }
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public IRecordReader<? extends SyndEntryImpl> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws Exception {
+ RSSRecordReader reader = new RSSRecordReader(urls.get(partition));
+ reader.configure(configuration);
+ return reader;
+ }
+
+ @Override
+ public Class<? extends SyndEntryImpl> getRecordClass() {
+ return SyndEntryImpl.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
new file mode 100644
index 0000000..6225b82
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.AInputStreamReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+
+public abstract class AbstractStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
+ protected AInputStreamReader reader;
+ protected CharArrayRecord record;
+ protected char[] inputBuffer;
+ protected int bufferLength = 0;
+ protected int bufferPosn = 0;
+ protected IExternalIndexer indexer;
+ protected boolean done = false;
+
+ @Override
+ public IRawRecord<char[]> next() throws IOException {
+ return record;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!done) {
+ reader.close();
+ }
+ done = true;
+ }
+
+ public void setInputStream(AInputStream inputStream) throws IOException {
+ this.reader = new AInputStreamReader(inputStream);
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ record = new CharArrayRecord();
+ inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+ }
+
+ @Override
+ public IExternalIndexer getIndexer() {
+ return indexer;
+ }
+
+ @Override
+ public void setIndexer(IExternalIndexer indexer) {
+ this.indexer = indexer;
+ }
+
+ @Override
+ public boolean stop() {
+ try {
+ reader.stop();
+ return true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ @Override
+ public void setController(IDataFlowController controller) {
+ reader.setController((AbstractFeedDataFlowController) controller);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
new file mode 100644
index 0000000..f02bd93
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexibleExternalDataSource;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public abstract class AbstractStreamRecordReaderFactory<T>
+ implements IRecordReaderFactory<T>, IIndexibleExternalDataSource {
+
+ private static final long serialVersionUID = 1L;
+ protected IInputStreamProviderFactory inputStreamFactory;
+ protected Map<String, String> configuration;
+
+ public AbstractStreamRecordReaderFactory<T> setInputStreamFactoryProvider(
+ IInputStreamProviderFactory inputStreamFactory) {
+ this.inputStreamFactory = inputStreamFactory;
+ return this;
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return inputStreamFactory.getPartitionConstraint();
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ inputStreamFactory.configure(configuration);
+ configureStreamReaderFactory(configuration);
+ }
+
+ protected abstract void configureStreamReaderFactory(Map<String, String> configuration) throws Exception;
+
+ @Override
+ public boolean isIndexible() {
+ return inputStreamFactory.isIndexible();
+ }
+
+ @Override
+ public void setSnapshot(List<ExternalFile> files, boolean indexingOp) throws Exception {
+ ((IIndexibleExternalDataSource) inputStreamFactory).setSnapshot(files, indexingOp);
+ }
+
+ @Override
+ public boolean isIndexingOp() {
+ if (inputStreamFactory.isIndexible()) {
+ return ((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp();
+ }
+ return false;
+ }
+
+ protected IRecordReader<char[]> configureReader(AbstractStreamRecordReader recordReader, IHyracksTaskContext ctx,
+ int partition) throws Exception {
+ IInputStreamProvider inputStreamProvider = inputStreamFactory.createInputStreamProvider(ctx, partition);
+ IExternalIndexer indexer = null;
+ if (inputStreamFactory.isIndexible()) {
+ if (((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp()) {
+ indexer = ((IIndexingDatasource) inputStreamProvider).getIndexer();
+ }
+ }
+ recordReader.setInputStream(inputStreamProvider.getInputStream());
+ recordReader.setIndexer(indexer);
+ recordReader.configure(configuration);
+ return recordReader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
new file mode 100644
index 0000000..f8f7cda
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+
+public class LineRecordReader extends AbstractStreamRecordReader {
+
+ protected boolean prevCharCR;
+ protected int newlineLength;
+ protected int recordNumber = 0;
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (done) {
+ return false;
+ }
+ /* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to record.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to record, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ */
+ newlineLength = 0; //length of terminating newline
+ prevCharCR = false; //true of prev char was CR
+ record.reset();
+ int readLength = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = reader.read(inputBuffer);
+ if (bufferLength <= 0) {
+ if (readLength > 0) {
+ record.endRecord();
+ recordNumber++;
+ return true;
+ }
+ close();
+ return false; //EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+ }
+ readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength; //CR at the end of the buffer
+ }
+ if (readLength > 0) {
+ record.append(inputBuffer, startPosn, readLength);
+ }
+ } while (newlineLength == 0);
+ recordNumber++;
+ return true;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ super.configure(configuration);
+ if (ExternalDataUtils.hasHeader(configuration)) {
+ if (hasNext()) {
+ next();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
new file mode 100644
index 0000000..f0867d3
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+ String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
+ LineRecordReader recordReader;
+ if (quoteString != null) {
+ recordReader = new QuotedLineRecordReader();
+ } else {
+ recordReader = new LineRecordReader();
+ }
+ return configureReader(recordReader, ctx, partition);
+ }
+
+ @Override
+ public Class<? extends char[]> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
new file mode 100644
index 0000000..a8eb07b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+
+public class QuotedLineRecordReader extends LineRecordReader {
+
+ private char quote;
+ private boolean prevCharEscape;
+ private boolean inQuote;
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ super.configure(configuration);
+ String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
+ if (quoteString == null || quoteString.length() != 1) {
+ throw new AsterixException(ExternalDataExceptionUtils.incorrectParameterMessage(
+ ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
+ }
+ this.quote = quoteString.charAt(0);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (done) {
+ return false;
+ }
+ newlineLength = 0;
+ prevCharCR = false;
+ prevCharEscape = false;
+ record.reset();
+ int readLength = 0;
+ inQuote = false;
+ do {
+ int startPosn = bufferPosn;
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = reader.read(inputBuffer);
+ if (bufferLength <= 0) {
+ {
+ if (readLength > 0) {
+ if (inQuote) {
+ throw new IOException("malformed input record ended inside quote");
+ }
+ record.endRecord();
+ recordNumber++;
+ return true;
+ }
+ close();
+ return false;
+ }
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) {
+ if (!inQuote) {
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn;
+ break;
+ }
+ if (prevCharCR) {
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+ if (inputBuffer[bufferPosn] == quote) {
+ if (!prevCharEscape) {
+ inQuote = true;
+ }
+ }
+ if (prevCharEscape) {
+ prevCharEscape = false;
+ } else {
+ prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ }
+ } else {
+ // only look for next quote
+ if (inputBuffer[bufferPosn] == quote) {
+ if (!prevCharEscape) {
+ inQuote = false;
+ }
+ }
+ prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ }
+ }
+ readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength;
+ }
+ if (readLength > 0) {
+ record.append(inputBuffer, startPosn, readLength);
+ }
+ } while (newlineLength == 0);
+ recordNumber++;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
new file mode 100644
index 0000000..d469cb3
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+
+public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
+
+ private int depth;
+ private boolean prevCharEscape;
+ private boolean inString;
+ private char recordStart;
+ private char recordEnd;
+ private int recordNumber = 0;
+
+ public int getRecordNumber() {
+ return recordNumber;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ super.configure(configuration);
+ String recStartString = configuration.get(ExternalDataConstants.KEY_RECORD_START);
+ String recEndString = configuration.get(ExternalDataConstants.KEY_RECORD_END);
+ if (recStartString != null) {
+ if (recStartString.length() != 1) {
+ throw new AsterixException(
+ ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
+ ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
+ }
+ recordStart = recStartString.charAt(0);
+ } else {
+ recordStart = ExternalDataConstants.DEFAULT_RECORD_START;
+ }
+ if (recEndString != null) {
+ if (recEndString.length() != 1) {
+ throw new AsterixException(
+ ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
+ ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
+ }
+ recordEnd = recEndString.charAt(0);
+ } else {
+ recordEnd = ExternalDataConstants.DEFAULT_RECORD_END;
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ if (done) {
+ return false;
+ }
+ record.reset();
+ boolean hasStarted = false;
+ boolean hasFinished = false;
+ prevCharEscape = false;
+ inString = false;
+ depth = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = reader.read(inputBuffer);
+ if (bufferLength <= 0) {
+ close();
+ return false; // EOF
+ }
+ }
+ if (!hasStarted) {
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+ if (inputBuffer[bufferPosn] == recordStart) {
+ startPosn = bufferPosn;
+ hasStarted = true;
+ depth = 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ } else if (inputBuffer[bufferPosn] != ExternalDataConstants.SPACE
+ && inputBuffer[bufferPosn] != ExternalDataConstants.TAB
+ && inputBuffer[bufferPosn] != ExternalDataConstants.LF
+ && inputBuffer[bufferPosn] != ExternalDataConstants.CR) {
+ // corrupted file. clear the buffer and stop reading
+ reader.skipError();
+ bufferPosn = bufferLength = 0;
+ throw new IOException("Malformed input stream");
+ }
+ }
+ }
+ if (hasStarted) {
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+ if (inString) {
+ // we are in a string, we only care about the string end
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
+ inString = false;
+ }
+ if (prevCharEscape) {
+ prevCharEscape = false;
+ } else {
+ prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ }
+ } else {
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE) {
+ inString = true;
+ } else if (inputBuffer[bufferPosn] == recordStart) {
+ depth += 1;
+ } else if (inputBuffer[bufferPosn] == recordEnd) {
+ depth -= 1;
+ if (depth == 0) {
+ hasFinished = true;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ int appendLength = bufferPosn - startPosn;
+ if (appendLength > 0) {
+ record.append(inputBuffer, startPosn, appendLength);
+ }
+ } while (!hasFinished);
+ record.endRecord();
+ recordNumber++;
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ try {
+ reader.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
new file mode 100644
index 0000000..ec8eac9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+ SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
+ return configureReader(recordReader, ctx, partition);
+ }
+
+ @Override
+ public Class<? extends char[]> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
new file mode 100644
index 0000000..084d6d0
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.twitter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.TwitterUtil;
+import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import twitter4j.Query;
+import twitter4j.QueryResult;
+import twitter4j.Status;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+
+public class TwitterPullRecordReader implements IRecordReader<Status> {
+
+ private String keywords;
+ private Query query;
+ private Twitter twitter;
+ private int requestInterval = 5; // seconds
+ private QueryResult result;
+ private int nextTweetIndex = 0;
+ private long lastTweetIdReceived = 0;
+ private GenericRecord<Status> record;
+
+ public TwitterPullRecordReader() {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ twitter = TwitterUtil.getTwitterService(configuration);
+ keywords = configuration.get(SearchAPIConstants.QUERY);
+ requestInterval = Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL));
+ query = new Query(keywords);
+ query.setCount(100);
+ record = new GenericRecord<Status>();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return true;
+ }
+
+ @Override
+ public IRawRecord<Status> next() throws IOException, InterruptedException {
+ if (result == null || nextTweetIndex >= result.getTweets().size()) {
+ Thread.sleep(1000 * requestInterval);
+ query.setSinceId(lastTweetIdReceived);
+ try {
+ result = twitter.search(query);
+ } catch (TwitterException e) {
+ throw new HyracksDataException(e);
+ }
+ nextTweetIndex = 0;
+ }
+ if (result != null && !result.getTweets().isEmpty()) {
+ List<Status> tw = result.getTweets();
+ Status tweet = tw.get(nextTweetIndex++);
+ if (lastTweetIdReceived < tweet.getId()) {
+ lastTweetIdReceived = tweet.getId();
+ }
+ record.set(tweet);
+ return record;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
new file mode 100644
index 0000000..764ac1d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.twitter;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.TwitterUtil;
+
+import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+
+public class TwitterPushRecordReader implements IRecordReader<Status> {
+ private LinkedBlockingQueue<Status> inputQ;
+ private TwitterStream twitterStream;
+ private GenericRecord<Status> record;
+ private boolean closed = false;
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ twitterStream.clearListeners();
+ twitterStream.cleanUp();
+ twitterStream = null;
+ closed = true;
+ }
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ record = new GenericRecord<Status>();
+ inputQ = new LinkedBlockingQueue<Status>();
+ twitterStream = TwitterUtil.getTwitterStream(configuration);
+ twitterStream.addListener(new TweetListener(inputQ));
+ FilterQuery query = TwitterUtil.getFilterQuery(configuration);
+ if (query != null) {
+ twitterStream.filter(query);
+ } else {
+ twitterStream.sample();
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return !closed;
+ }
+
+ @Override
+ public IRawRecord<Status> next() throws IOException, InterruptedException {
+ Status tweet = inputQ.poll();
+ if (tweet == null) {
+ return null;
+ }
+ record.set(tweet);
+ return record;
+ }
+
+ @Override
+ public boolean stop() {
+ try {
+ close();
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ private class TweetListener implements StatusListener {
+
+ private LinkedBlockingQueue<Status> inputQ;
+
+ public TweetListener(LinkedBlockingQueue<Status> inputQ) {
+ this.inputQ = inputQ;
+ }
+
+ @Override
+ public void onStatus(Status tweet) {
+ inputQ.add(tweet);
+ }
+
+ @Override
+ public void onException(Exception arg0) {
+
+ }
+
+ @Override
+ public void onDeletionNotice(StatusDeletionNotice arg0) {
+ }
+
+ @Override
+ public void onScrubGeo(long arg0, long arg1) {
+ }
+
+ @Override
+ public void onStallWarning(StallWarning arg0) {
+ }
+
+ @Override
+ public void onTrackLimitationNotice(int arg0) {
+ }
+ }
+}