You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/30 00:52:17 UTC
[4/5] incubator-asterixdb git commit: Add flush() to IFrameWriter
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
deleted file mode 100644
index 3a82a68..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.ILookupReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.HDFSLookupReaderFactory;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.HDFSUtils;
-
-public class LookupReaderFactoryProvider {
-
- @SuppressWarnings("rawtypes")
- public static ILookupReaderFactory getLookupReaderFactory(Map<String, String> configuration) throws Exception {
- String inputFormat = HDFSUtils.getInputFormatClassName(configuration);
- if (inputFormat.equals(ExternalDataConstants.CLASS_NAME_TEXT_INPUT_FORMAT)
- || inputFormat.equals(ExternalDataConstants.CLASS_NAME_SEQUENCE_INPUT_FORMAT)
- || inputFormat.equals(ExternalDataConstants.CLASS_NAME_RC_INPUT_FORMAT)) {
- HDFSLookupReaderFactory<Object> readerFactory = new HDFSLookupReaderFactory<Object>();
- readerFactory.configure(configuration);
- return readerFactory;
- } else {
- throw new AsterixException("Unrecognized external format");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
deleted file mode 100644
index 49e67e9..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
-
-public class QuotedLineRecordReader extends LineRecordReader {
-
- private char quote;
- private boolean prevCharEscape;
- private boolean inQuote;
-
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
- super.configure(configuration);
- String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
- if (quoteString == null || quoteString.length() != 1) {
- throw new AsterixException(ExternalDataExceptionUtils.incorrectParameterMessage(
- ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
- }
- this.quote = quoteString.charAt(0);
- }
-
- @Override
- public boolean hasNext() throws IOException {
- if (done) {
- return false;
- }
- newlineLength = 0;
- prevCharCR = false;
- prevCharEscape = false;
- record.reset();
- int readLength = 0;
- inQuote = false;
- do {
- int startPosn = bufferPosn;
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- bufferLength = reader.read(inputBuffer);
- if (bufferLength <= 0) {
- {
- if (readLength > 0) {
- if (inQuote) {
- throw new IOException("malformed input record ended inside quote");
- }
- record.endRecord();
- recordNumber++;
- return true;
- }
- close();
- return false;
- }
- }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) {
- if (!inQuote) {
- if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn;
- break;
- }
- if (prevCharCR) {
- newlineLength = 1;
- break;
- }
- prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
- if (inputBuffer[bufferPosn] == quote) {
- if (!prevCharEscape) {
- inQuote = true;
- }
- }
- if (prevCharEscape) {
- prevCharEscape = false;
- } else {
- prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
- }
- } else {
- // only look for next quote
- if (inputBuffer[bufferPosn] == quote) {
- if (!prevCharEscape) {
- inQuote = false;
- }
- }
- prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
- }
- }
- readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength;
- }
- if (readLength > 0) {
- record.append(inputBuffer, startPosn, readLength);
- }
- } while (newlineLength == 0);
- recordNumber++;
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
deleted file mode 100644
index 5c33502..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.RecordId;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.log4j.Logger;
-
-public class RCLookupReader extends AbstractHDFSLookupRecordReader<BytesRefArrayWritable> {
- public RCLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
- super(snapshotAccessor, fs, conf);
- }
-
- private static final Logger LOGGER = Logger.getLogger(RCLookupReader.class.getName());
- private Reader reader;
- private LongWritable key = new LongWritable();
- private BytesRefArrayWritable value = new BytesRefArrayWritable();
- private GenericRecord<BytesRefArrayWritable> record = new GenericRecord<BytesRefArrayWritable>();
- private long offset;
- private int row;
-
- @Override
- public Class<?> getRecordClass() throws IOException {
- return Writable.class;
- }
-
- @Override
- protected IRawRecord<BytesRefArrayWritable> lookup(RecordId rid) throws IOException {
- if (rid.getOffset() != offset) {
- offset = rid.getOffset();
- if (reader.getPosition() != offset)
- reader.seek(offset);
- reader.resetBuffer();
- row = -1;
- }
-
- // skip rows to the record row
- while (row < rid.getRow()) {
- reader.next(key);
- reader.getCurrentRow(value);
- row++;
- }
- record.set(value);
- return record;
- }
-
- @Override
- protected void closeFile() {
- if (reader == null) {
- return;
- }
- try {
- reader.close();
- } catch (Exception e) {
- LOGGER.warn("Error closing HDFS file", e);
- }
- }
-
- @Override
- protected void openFile() throws IllegalArgumentException, IOException {
- reader = new Reader(fs, new Path(file.getFileName()), conf);
- offset = -1;
- row = -1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
deleted file mode 100644
index 1c2dc30..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.log4j.Logger;
-
-import com.sun.syndication.feed.synd.SyndEntryImpl;
-import com.sun.syndication.feed.synd.SyndFeed;
-import com.sun.syndication.fetcher.FeedFetcher;
-import com.sun.syndication.fetcher.FetcherEvent;
-import com.sun.syndication.fetcher.FetcherException;
-import com.sun.syndication.fetcher.FetcherListener;
-import com.sun.syndication.fetcher.impl.FeedFetcherCache;
-import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
-import com.sun.syndication.fetcher.impl.HttpURLFeedFetcher;
-import com.sun.syndication.io.FeedException;
-
-public class RSSRecordReader implements IRecordReader<SyndEntryImpl> {
-
- private static final Logger LOGGER = Logger.getLogger(RSSRecordReader.class.getName());
- private boolean modified = false;
- private Queue<SyndEntryImpl> rssFeedBuffer = new LinkedList<SyndEntryImpl>();
- private FeedFetcherCache feedInfoCache;
- private FeedFetcher fetcher;
- private FetcherEventListenerImpl listener;
- private URL feedUrl;
- private GenericRecord<SyndEntryImpl> record = new GenericRecord<SyndEntryImpl>();
- private boolean done = false;
-
- public RSSRecordReader(String url) throws MalformedURLException {
- feedUrl = new URL(url);
- feedInfoCache = HashMapFeedInfoCache.getInstance();
- fetcher = new HttpURLFeedFetcher(feedInfoCache);
- listener = new FetcherEventListenerImpl(this, LOGGER);
- fetcher.addFetcherEventListener(listener);
- }
-
- public boolean isModified() {
- return modified;
- }
-
- @Override
- public void close() throws IOException {
- fetcher.removeFetcherEventListener(listener);
- }
-
- @Override
- public void configure(Map<String, String> configurations) throws Exception {
- }
-
- @Override
- public boolean hasNext() throws Exception {
- return !done;
- }
-
- @Override
- public IRawRecord<SyndEntryImpl> next() throws IOException {
- if (done) {
- return null;
- }
- try {
- SyndEntryImpl feedEntry;
- feedEntry = getNextRSSFeed();
- if (feedEntry == null) {
- return null;
- }
- record.set(feedEntry);
- return record;
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public Class<SyndEntryImpl> getRecordClass() throws IOException {
- return SyndEntryImpl.class;
- }
-
- @Override
- public boolean stop() {
- done = true;
- return true;
- }
-
- public void setModified(boolean modified) {
- this.modified = modified;
- }
-
- private SyndEntryImpl getNextRSSFeed() throws Exception {
- if (rssFeedBuffer.isEmpty()) {
- fetchFeed();
- }
- if (rssFeedBuffer.isEmpty()) {
- return null;
- } else {
- return rssFeedBuffer.remove();
- }
- }
-
- @SuppressWarnings("unchecked")
- private void fetchFeed() throws IllegalArgumentException, IOException, FeedException, FetcherException {
- // Retrieve the feed.
- // We will get a Feed Polled Event and then a
- // Feed Retrieved event (assuming the feed is valid)
- SyndFeed feed = fetcher.retrieveFeed(feedUrl);
- if (modified) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(feedUrl + " retrieved");
- LOGGER.info(feedUrl + " has a title: " + feed.getTitle() + " and contains " + feed.getEntries().size()
- + " entries.");
- }
- List<? extends SyndEntryImpl> fetchedFeeds = feed.getEntries();
- rssFeedBuffer.addAll(fetchedFeeds);
- }
- }
-}
-
-class FetcherEventListenerImpl implements FetcherListener {
-
- private RSSRecordReader reader;
- private Logger LOGGER;
-
- public FetcherEventListenerImpl(RSSRecordReader reader, Logger LOGGER) {
- this.reader = reader;
- this.LOGGER = LOGGER;
- }
-
- /**
- * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
- */
- @Override
- public void fetcherEvent(FetcherEvent event) {
- String eventType = event.getEventType();
- if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("\tEVENT: Feed Polled. URL = " + event.getUrlString());
- }
- } else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("\tEVENT: Feed Retrieved. URL = " + event.getUrlString());
- }
- (reader).setModified(true);
- } else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("\tEVENT: Feed Unchanged. URL = " + event.getUrlString());
- }
- (reader).setModified(true);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
deleted file mode 100644
index 84c96d0..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
-
-public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
-
- private int depth;
- private boolean prevCharEscape;
- private boolean inString;
- private char recordStart;
- private char recordEnd;
- private int recordNumber = 0;
-
- public int getRecordNumber() {
- return recordNumber;
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
- super.configure(configuration);
- String recStartString = configuration.get(ExternalDataConstants.KEY_RECORD_START);
- String recEndString = configuration.get(ExternalDataConstants.KEY_RECORD_END);
- if (recStartString != null) {
- if (recStartString.length() != 1) {
- throw new AsterixException(
- ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
- ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
- }
- recordStart = recStartString.charAt(0);
- } else {
- recordStart = ExternalDataConstants.DEFAULT_RECORD_START;
- }
- if (recEndString != null) {
- if (recEndString.length() != 1) {
- throw new AsterixException(
- ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
- ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
- }
- recordEnd = recEndString.charAt(0);
- } else {
- recordEnd = ExternalDataConstants.DEFAULT_RECORD_END;
- }
- }
-
- @Override
- public boolean hasNext() throws Exception {
- if (done) {
- return false;
- }
- record.reset();
- boolean hasStarted = false;
- boolean hasFinished = false;
- prevCharEscape = false;
- inString = false;
- depth = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- bufferLength = reader.read(inputBuffer);
- if (bufferLength <= 0) {
- close();
- return false; // EOF
- }
- }
- if (!hasStarted) {
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
- if (inputBuffer[bufferPosn] == recordStart) {
- startPosn = bufferPosn;
- hasStarted = true;
- depth = 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- } else if (inputBuffer[bufferPosn] != ExternalDataConstants.SPACE
- && inputBuffer[bufferPosn] != ExternalDataConstants.TAB
- && inputBuffer[bufferPosn] != ExternalDataConstants.LF
- && inputBuffer[bufferPosn] != ExternalDataConstants.CR) {
- // corrupted file. clear the buffer and stop reading
- reader.skipError();
- bufferPosn = bufferLength = 0;
- throw new IOException("Malformed input stream");
- }
- }
- }
- if (hasStarted) {
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
- if (inString) {
- // we are in a string, we only care about the string end
- if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
- inString = false;
- }
- if (prevCharEscape) {
- prevCharEscape = false;
- } else {
- prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
- }
- } else {
- if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE) {
- inString = true;
- } else if (inputBuffer[bufferPosn] == recordStart) {
- depth += 1;
- } else if (inputBuffer[bufferPosn] == recordEnd) {
- depth -= 1;
- if (depth == 0) {
- hasFinished = true;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- }
- }
- }
- }
-
- int appendLength = bufferPosn - startPosn;
- if (appendLength > 0) {
- record.append(inputBuffer, startPosn, appendLength);
- }
- } while (!hasFinished);
- record.endRecord();
- recordNumber++;
- return true;
- }
-
- @Override
- public boolean stop() {
- try {
- reader.stop();
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
deleted file mode 100644
index c294ccb..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.RecordId;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.log4j.Logger;
-
-public class SequenceLookupReader extends AbstractCharRecordLookupReader {
-
- public SequenceLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
- super(snapshotAccessor, fs, conf);
- }
-
- private static final Logger LOGGER = Logger.getLogger(SequenceLookupReader.class.getName());
- private Reader reader;
- private Writable key;
-
- @Override
- protected void readRecord(RecordId rid) throws IOException {
- reader.seek(rid.getOffset());
- reader.next(key, value);
- }
-
- @Override
- protected void closeFile() {
- if (reader == null) {
- return;
- }
- try {
- reader.close();
- } catch (Exception e) {
- LOGGER.warn("Error closing HDFS file ", e);
- }
- }
-
- @SuppressWarnings("deprecation")
- @Override
- protected void openFile() throws IllegalArgumentException, IOException {
- reader = new SequenceFile.Reader(fs, new Path(file.getFileName()), conf);
- key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
deleted file mode 100644
index b276bfa..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.RecordId;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-public class TextLookupReader extends AbstractCharRecordLookupReader {
-
- public TextLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
- super(snapshotAccessor, fs, conf);
- }
-
- private static final Logger LOGGER = Logger.getLogger(TextLookupReader.class.getName());
- private HDFSTextLineReader reader;
-
- @Override
- protected void readRecord(RecordId rid) throws IOException {
- reader.seek(rid.getOffset());
- reader.readLine(value);
- }
-
- @Override
- protected void closeFile() {
- if (reader == null) {
- return;
- }
- try {
- reader.close();
- } catch (Exception e) {
- LOGGER.warn("Error closing HDFS file ", e);
- }
- }
-
- @Override
- protected void openFile() throws IllegalArgumentException, IOException {
- if (reader == null) {
- reader = new HDFSTextLineReader();
- }
- reader.resetReader(fs.open(new Path(file.getFileName())));;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
deleted file mode 100644
index 34d8122..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-import twitter4j.Query;
-import twitter4j.QueryResult;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-
-public class TwitterPullRecordReader implements IRecordReader<Status> {
-
- private String keywords;
- private Query query;
- private Twitter twitter;
- private int requestInterval = 5; // seconds
- private QueryResult result;
- private int nextTweetIndex = 0;
- private long lastTweetIdReceived = 0;
- private GenericRecord<Status> record;
-
- public TwitterPullRecordReader() {
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
- twitter = TwitterUtil.getTwitterService(configuration);
- keywords = configuration.get(SearchAPIConstants.QUERY);
- requestInterval = Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL));
- query = new Query(keywords);
- query.setCount(100);
- record = new GenericRecord<Status>();
- }
-
- @Override
- public boolean hasNext() throws Exception {
- return true;
- }
-
- @Override
- public IRawRecord<Status> next() throws IOException, InterruptedException {
- if (result == null || nextTweetIndex >= result.getTweets().size()) {
- Thread.sleep(1000 * requestInterval);
- query.setSinceId(lastTweetIdReceived);
- try {
- result = twitter.search(query);
- } catch (TwitterException e) {
- throw new HyracksDataException(e);
- }
- nextTweetIndex = 0;
- }
- if (result != null && !result.getTweets().isEmpty()) {
- List<Status> tw = result.getTweets();
- Status tweet = tw.get(nextTweetIndex++);
- if (lastTweetIdReceived < tweet.getId()) {
- lastTweetIdReceived = tweet.getId();
- }
- record.set(tweet);
- return record;
- } else {
- return null;
- }
- }
-
- @Override
- public Class<Status> getRecordClass() throws IOException {
- return Status.class;
- }
-
- @Override
- public boolean stop() {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
deleted file mode 100644
index 3ce6a81..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.asterix.external.util.TwitterUtil;
-
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterStream;
-
-public class TwitterPushRecordReader implements IRecordReader<Status> {
- private LinkedBlockingQueue<Status> inputQ;
- private TwitterStream twitterStream;
- private GenericRecord<Status> record;
- private boolean closed = false;
-
- @Override
- public void close() throws IOException {
- if (!closed) {
- twitterStream.clearListeners();
- twitterStream.cleanUp();
- twitterStream = null;
- closed = true;
- }
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
- record = new GenericRecord<Status>();
- inputQ = new LinkedBlockingQueue<Status>();
- twitterStream = TwitterUtil.getTwitterStream(configuration);
- twitterStream.addListener(new TweetListener(inputQ));
- FilterQuery query = TwitterUtil.getFilterQuery(configuration);
- if (query != null) {
- twitterStream.filter(query);
- } else {
- twitterStream.sample();
- }
- }
-
- @Override
- public boolean hasNext() throws Exception {
- return !closed;
- }
-
- @Override
- public IRawRecord<Status> next() throws IOException, InterruptedException {
- Status tweet = inputQ.poll();
- if (tweet == null) {
- return null;
- }
- record.set(tweet);
- return record;
- }
-
- @Override
- public Class<? extends Status> getRecordClass() throws IOException {
- return Status.class;
- }
-
- @Override
- public boolean stop() {
- try {
- close();
- } catch (Exception e) {
- return false;
- }
- return true;
- }
-
- private class TweetListener implements StatusListener {
-
- private LinkedBlockingQueue<Status> inputQ;
-
- public TweetListener(LinkedBlockingQueue<Status> inputQ) {
- this.inputQ = inputQ;
- }
-
- @Override
- public void onStatus(Status tweet) {
- inputQ.add(tweet);
- }
-
- @Override
- public void onException(Exception arg0) {
-
- }
-
- @Override
- public void onDeletionNotice(StatusDeletionNotice arg0) {
- }
-
- @Override
- public void onScrubGeo(long arg0, long arg1) {
- }
-
- @Override
- public void onStallWarning(StallWarning arg0) {
- }
-
- @Override
- public void onTrackLimitationNotice(int arg0) {
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
new file mode 100644
index 0000000..895af1b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.couchbase;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
+
+import com.couchbase.client.core.CouchbaseCore;
+import com.couchbase.client.core.dcp.BucketStreamAggregator;
+import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
+import com.couchbase.client.core.dcp.BucketStreamState;
+import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
+import com.couchbase.client.core.env.DefaultCoreEnvironment;
+import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.message.cluster.CloseBucketRequest;
+import com.couchbase.client.core.message.cluster.OpenBucketRequest;
+import com.couchbase.client.core.message.cluster.SeedNodesRequest;
+import com.couchbase.client.core.message.dcp.DCPRequest;
+import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.RemoveMessage;
+import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+
+import rx.functions.Action1;
+
+public class CouchbaseReader implements IRecordReader<RecordWithMetadata<char[]>> {
+
+ private static final MutationMessage POISON_PILL = new MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
+ null);
+ private final String feedName;
+ private final short[] vbuckets;
+ private final String bucket;
+ private final String password;
+ private final String[] couchbaseNodes;
+ private AbstractFeedDataFlowController controller;
+ private Builder builder;
+ private BucketStreamAggregator bucketStreamAggregator;
+ private CouchbaseCore core;
+ private DefaultCoreEnvironment env;
+ private Thread pushThread;
+ private ArrayBlockingQueue<MutationMessage> messages;
+ private GenericRecord<RecordWithMetadata<char[]>> record;
+ private RecordWithMetadata<char[]> recordWithMetadata;
+ private boolean done = false;
+ private CharArrayRecord value;
+ private CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+ private ByteBuffer bytes = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ private CharBuffer chars = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ // metaTypes = {key(string), bucket(string), vbucket(int32), seq(long), cas(long), creationTime(long),expiration(int32),flags(int32),revSeqNumber(long),lockTime(int32)}
+ private static final IAType[] metaTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT32,
+ BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT32 };
+ private static final Logger LOGGER = Logger.getLogger(CouchbaseReader.class);
+
+ public CouchbaseReader(String feedName, String bucket, String password, String[] couchbaseNodes, short[] vbuckets,
+ int queueSize) throws HyracksDataException {
+ this.feedName = feedName;
+ this.bucket = bucket;
+ this.password = password;
+ this.couchbaseNodes = couchbaseNodes;
+ this.vbuckets = vbuckets;
+ this.recordWithMetadata = new RecordWithMetadata<char[]>(metaTypes, char[].class);
+ this.messages = new ArrayBlockingQueue<MutationMessage>(queueSize);
+ this.value = new CharArrayRecord();
+ recordWithMetadata.setRecord(value);
+ this.record = new GenericRecord<RecordWithMetadata<char[]>>(recordWithMetadata);
+ }
+
+ @Override
+ public void close() {
+ if (!done) {
+ done = true;
+ }
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.builder = DefaultCoreEnvironment.builder().dcpEnabled(CouchbaseReaderFactory.DCP_ENABLED)
+ .autoreleaseAfter(CouchbaseReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS);
+ this.env = builder.build();
+ this.core = new CouchbaseCore(env);
+ this.bucketStreamAggregator = new BucketStreamAggregator(feedName, core, bucket);
+ connect();
+ }
+
+ private void connect() {
+ core.send(new SeedNodesRequest(couchbaseNodes))
+ .timeout(CouchbaseReaderFactory.TIMEOUT, CouchbaseReaderFactory.TIME_UNIT).toBlocking().single();
+ core.send(new OpenBucketRequest(bucket, password))
+ .timeout(CouchbaseReaderFactory.TIMEOUT, CouchbaseReaderFactory.TIME_UNIT).toBlocking().single();
+ this.pushThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ CouchbaseReader.this.run(bucketStreamAggregator);
+ }
+ }, feedName);
+ pushThread.start();
+ }
+
+ private void run(BucketStreamAggregator bucketStreamAggregator) {
+ BucketStreamAggregatorState state = new BucketStreamAggregatorState();
+ for (int i = 0; i < vbuckets.length; i++) {
+ state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 0xffffffff));
+ }
+ state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() {
+ @Override
+ public void call(BucketStreamStateUpdatedEvent event) {
+ if (event.partialUpdate()) {
+ } else {
+ }
+ }
+ });
+ try {
+ bucketStreamAggregator.feed(state).toBlocking().forEach(new Action1<DCPRequest>() {
+ @Override
+ public void call(final DCPRequest dcpRequest) {
+ try {
+ if (dcpRequest instanceof SnapshotMarkerMessage) {
+ SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
+ final BucketStreamState oldState = state.get(message.partition());
+ state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(),
+ message.endSequenceNumber(), oldState.endSequenceNumber(),
+ message.endSequenceNumber(), oldState.snapshotEndSequenceNumber()));
+ } else if (dcpRequest instanceof MutationMessage) {
+
+ messages.put((MutationMessage) dcpRequest);
+ } else if (dcpRequest instanceof RemoveMessage) {
+ RemoveMessage message = (RemoveMessage) dcpRequest;
+ LOGGER.info(message.key() + " was deleted.");
+ }
+ } catch (Throwable th) {
+ LOGGER.error(th);
+ }
+ }
+ });
+ } catch (Throwable th) {
+ if (th.getCause() instanceof InterruptedException) {
+ LOGGER.warn("dcp thread was interrupted", th);
+ synchronized (this) {
+ CouchbaseReader.this.close();
+ notifyAll();
+ }
+ }
+ throw th;
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return !done;
+ }
+
+ @Override
+ public IRawRecord<RecordWithMetadata<char[]>> next() throws IOException, InterruptedException {
+ if (messages.isEmpty()) {
+ controller.flush();
+ }
+ MutationMessage message = messages.take();
+ if (message == POISON_PILL) {
+ return null;
+ }
+ String key = message.key();
+ int vbucket = message.partition();
+ long seq = message.bySequenceNumber();
+ String bucket = message.bucket();
+ long cas = message.cas();
+ long creationTime = message.creationTime();
+ int expiration = message.expiration();
+ int flags = message.flags();
+ long revSeqNumber = message.revisionSequenceNumber();
+ int lockTime = message.lockTime();
+ recordWithMetadata.reset();
+ recordWithMetadata.setMetadata(0, key);
+ recordWithMetadata.setMetadata(1, bucket);
+ recordWithMetadata.setMetadata(2, vbucket);
+ recordWithMetadata.setMetadata(3, seq);
+ recordWithMetadata.setMetadata(4, cas);
+ recordWithMetadata.setMetadata(5, creationTime);
+ recordWithMetadata.setMetadata(6, expiration);
+ recordWithMetadata.setMetadata(7, flags);
+ recordWithMetadata.setMetadata(8, revSeqNumber);
+ recordWithMetadata.setMetadata(9, lockTime);
+ CouchbaseReader.set(message.content(), decoder, bytes, chars, value);
+ return record;
+ }
+
+ @Override
+ public boolean stop() {
+ done = true;
+ core.send(new CloseBucketRequest(bucket)).toBlocking();
+ try {
+ messages.put(CouchbaseReader.POISON_PILL);
+ } catch (InterruptedException e) {
+ LOGGER.warn(e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void setController(IDataFlowController controller) {
+ this.controller = (AbstractFeedDataFlowController) controller;
+ }
+
+ public static void set(ByteBuf content, CharsetDecoder decoder, ByteBuffer bytes, CharBuffer chars,
+ CharArrayRecord record) {
+ int position = content.readerIndex();
+ int limit = content.writerIndex();
+ int contentSize = content.capacity();
+ while (position < limit) {
+ bytes.clear();
+ chars.clear();
+ if (contentSize - position < bytes.capacity()) {
+ bytes.limit(contentSize - position);
+ }
+ content.getBytes(position, bytes);
+ position += bytes.position();
+ bytes.flip();
+ decoder.decode(bytes, chars, false);
+ chars.flip();
+ record.append(chars);
+ }
+ record.endRecord();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
new file mode 100644
index 0000000..b9b6f65
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.couchbase;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import com.couchbase.client.core.CouchbaseCore;
+import com.couchbase.client.core.config.CouchbaseBucketConfig;
+import com.couchbase.client.core.env.DefaultCoreEnvironment;
+import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.message.cluster.CloseBucketRequest;
+import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
+import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
+import com.couchbase.client.core.message.cluster.OpenBucketRequest;
+import com.couchbase.client.core.message.cluster.SeedNodesRequest;
+
+import rx.functions.Func1;
+
+public class CouchbaseReaderFactory implements IRecordReaderFactory<RecordWithMetadata<char[]>> {
+
+ private static final long serialVersionUID = 1L;
+ // Constant fields
+ public static final boolean DCP_ENABLED = true;
+ public static final long AUTO_RELEASE_AFTER_MILLISECONDS = 5000L;
+ public static final int TIMEOUT = 5;
+ public static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+ // Dynamic fields
+ private Map<String, String> configuration;
+ private String bucket;
+ private String password = "";
+ private String[] couchbaseNodes;
+ private int numOfVBuckets;
+ private int[] schedule;
+ private String feedName;
+ // Transient fields
+ private transient CouchbaseCore core;
+ private transient Builder builder;
+ private transient DefaultCoreEnvironment env;
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return AsterixClusterProperties.INSTANCE.getClusterLocations();
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ // validate first
+ if (!configuration.containsKey(ExternalDataConstants.KEY_BUCKET)) {
+ throw new AsterixException("Unspecified bucket");
+ }
+ if (!configuration.containsKey(ExternalDataConstants.KEY_NODES)) {
+ throw new AsterixException("Unspecified Couchbase nodes");
+ }
+ if (configuration.containsKey(ExternalDataConstants.KEY_PASSWORD)) {
+ password = configuration.get(ExternalDataConstants.KEY_PASSWORD);
+ }
+ this.configuration = configuration;
+ bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
+ couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(",");
+ feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
+ builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED)
+ .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
+ env = builder.build();
+ core = new CouchbaseCore(env);
+ getNumberOfVbuckets();
+ schedule();
+ }
+
+ /*
+ * We distribute the work of streaming vbuckets between all the partitions in a round robin fashion.
+ */
+ private void schedule() {
+ schedule = new int[numOfVBuckets];
+ String[] locations = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
+ for (int i = 0; i < numOfVBuckets; i++) {
+ schedule[i] = i % locations.length;
+ }
+ }
+
+ private void getNumberOfVbuckets() {
+ core.send(new SeedNodesRequest(couchbaseNodes)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+ core.send(new OpenBucketRequest(bucket, password)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+ numOfVBuckets = core.<GetClusterConfigResponse> send(new GetClusterConfigRequest())
+ .map(new Func1<GetClusterConfigResponse, Integer>() {
+ @Override
+ public Integer call(GetClusterConfigResponse response) {
+ CouchbaseBucketConfig config = (CouchbaseBucketConfig) response.config().bucketConfig(bucket);
+ return config.numberOfPartitions();
+ }
+ }).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+ core.send(new CloseBucketRequest(bucket)).toBlocking();
+ }
+
+ @Override
+ public IRecordReader<? extends RecordWithMetadata<char[]>> createRecordReader(IHyracksTaskContext ctx,
+ int partition) throws Exception {
+ String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>();
+ for (int i = 0; i < schedule.length; i++) {
+ if (schedule[i] == partition) {
+ listOfAssignedVBuckets.add((short) i);
+ }
+ }
+ short[] vbuckets = new short[listOfAssignedVBuckets.size()];
+ for (int i = 0; i < vbuckets.length; i++) {
+ vbuckets[i] = listOfAssignedVBuckets.get(i);
+ }
+ CouchbaseReader reader = new CouchbaseReader(feedName + ":" + nodeName + ":" + partition, bucket, password,
+ couchbaseNodes, vbuckets, ExternalDataUtils.getQueueSize(configuration));
+ reader.configure(configuration);
+ return reader;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<? extends RecordWithMetadata<char[]>> getRecordClass() {
+ RecordWithMetadata<char[]> record = new RecordWithMetadata<char[]>(char[].class);
+ return (Class<? extends RecordWithMetadata<char[]>>) record.getClass();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
deleted file mode 100644
index e9fad25..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.factory;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.ILookupReaderFactory;
-import org.apache.asterix.external.api.ILookupRecordReader;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.input.record.reader.RCLookupReader;
-import org.apache.asterix.external.input.record.reader.SequenceLookupReader;
-import org.apache.asterix.external.input.record.reader.TextLookupReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.hdfs.dataflow.ConfFactory;
-
-public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
-
- protected static final long serialVersionUID = 1L;
- protected transient AlgebricksPartitionConstraint clusterLocations;
- protected ConfFactory confFactory;
- protected Map<String, String> configuration;
-
- public HDFSLookupReaderFactory() {
- }
-
- @Override
- public DataSourceType getDataSourceType() {
- return DataSourceType.RECORDS;
- }
-
- @Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
- return clusterLocations;
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
- this.configuration = configuration;
- JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
- confFactory = new ConfFactory(conf);
-
- }
-
- @Override
- public boolean isIndexible() {
- return false;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public ILookupRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition,
- ExternalFileIndexAccessor snapshotAccessor) throws Exception {
- String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
- JobConf conf = confFactory.getConf();
- FileSystem fs = FileSystem.get(conf);
- switch (inputFormatParameter) {
- case ExternalDataConstants.INPUT_FORMAT_TEXT:
- return (ILookupRecordReader<? extends T>) new TextLookupReader(snapshotAccessor, fs, conf);
- case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
- return (ILookupRecordReader<? extends T>) new SequenceLookupReader(snapshotAccessor, fs, conf);
- case ExternalDataConstants.INPUT_FORMAT_RC:
- return (ILookupRecordReader<? extends T>) new RCLookupReader(snapshotAccessor, fs, conf);
- default:
- throw new AsterixException("Unrecognised input format: " + inputFormatParameter);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
deleted file mode 100644
index 05d419d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.factory;
-
-import java.util.Map;
-
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.LineRecordReader;
-import org.apache.asterix.external.input.record.reader.QuotedLineRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
- String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
- LineRecordReader recordReader;
- if (quoteString != null) {
- recordReader = new QuotedLineRecordReader();
- } else {
- recordReader = new LineRecordReader();
- }
- return configureReader(recordReader, ctx, partition);
- }
-
- @Override
- public Class<? extends char[]> getRecordClass() {
- return char[].class;
- }
-
- @Override
- protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
deleted file mode 100644
index a672f2f..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.factory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.RSSRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-import com.sun.syndication.feed.synd.SyndEntryImpl;
-
-public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntryImpl> {
-
- private static final long serialVersionUID = 1L;
- private Map<String, String> configuration;
- private List<String> urls = new ArrayList<String>();
-
- @Override
- public DataSourceType getDataSourceType() {
- return DataSourceType.RECORDS;
- }
-
- @Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return new AlgebricksCountPartitionConstraint(urls.size());
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
- this.configuration = configuration;
- String url = configuration.get(ExternalDataConstants.KEY_RSS_URL);
- if (url == null) {
- throw new IllegalArgumentException("no RSS URL provided");
- }
- initializeURLs(url);
- }
-
- private void initializeURLs(String url) {
- urls.clear();
- String[] rssURLs = url.split(",");
- for (String rssURL : rssURLs) {
- urls.add(rssURL);
- }
- }
-
- @Override
- public boolean isIndexible() {
- return false;
- }
-
- @Override
- public IRecordReader<? extends SyndEntryImpl> createRecordReader(IHyracksTaskContext ctx, int partition)
- throws Exception {
- RSSRecordReader reader = new RSSRecordReader(urls.get(partition));
- reader.configure(configuration);
- return reader;
- }
-
- @Override
- public Class<? extends SyndEntryImpl> getRecordClass() {
- return SyndEntryImpl.class;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
deleted file mode 100644
index 91b439c..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.factory;
-
-import java.util.Map;
-
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.SemiStructuredRecordReader;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
- SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
- return configureReader(recordReader, ctx, partition);
- }
-
- @Override
- public Class<? extends char[]> getRecordClass() {
- return char[].class;
- }
-
- @Override
- protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
deleted file mode 100644
index 6840c11..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.factory;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.TwitterPullRecordReader;
-import org.apache.asterix.external.input.record.reader.TwitterPushRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
-import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-import twitter4j.Status;
-
-public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName());
-
- private static final String DEFAULT_INTERVAL = "10"; // 10 seconds
- private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
-
- private Map<String, String> configuration;
- private boolean pull;
-
- @Override
- public DataSourceType getDataSourceType() {
- return DataSourceType.RECORDS;
- }
-
- @Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY);
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
- this.configuration = configuration;
- TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
- if (!validateConfiguration(configuration)) {
- StringBuilder builder = new StringBuilder();
- builder.append("One or more parameters are missing from adapter configuration\n");
- builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
- builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
- builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
- builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
- throw new Exception(builder.toString());
- }
- if (ExternalDataUtils.isPull(configuration)) {
- pull = true;
- if (configuration.get(SearchAPIConstants.QUERY) == null) {
- throw new AsterixException(
- "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
- }
- String interval = configuration.get(SearchAPIConstants.INTERVAL);
- if (interval != null) {
- try {
- Integer.parseInt(interval);
- } catch (NumberFormatException nfe) {
- throw new IllegalArgumentException(
- "parameter " + SearchAPIConstants.INTERVAL + " is defined incorrectly, expecting a number");
- }
- } else {
- configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
- + DEFAULT_INTERVAL + ")");
- }
- }
- } else if (ExternalDataUtils.isPush(configuration)) {
- pull = false;
- } else {
- throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
- + ExternalDataConstants.KEY_PUSH + " must be specified as part of adaptor configuration");
- }
- }
-
- @Override
- public boolean isIndexible() {
- return false;
- }
-
- @Override
- public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
- IRecordReader<Status> reader;
- if (pull) {
- reader = new TwitterPullRecordReader();
- } else {
- reader = new TwitterPushRecordReader();
- }
- reader.configure(configuration);
- return reader;
- }
-
- @Override
- public Class<? extends Status> getRecordClass() {
- return Status.class;
- }
-
- private boolean validateConfiguration(Map<String, String> configuration) {
- String consumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
- String consumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
- String accessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
- String tokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
- if (consumerKey == null || consumerSecret == null || accessToken == null || tokenSecret == null) {
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
new file mode 100644
index 0000000..0627660
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+public abstract class AbstractCharRecordLookupReader extends AbstractHDFSLookupRecordReader<char[]> {
+ public AbstractCharRecordLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs,
+ Configuration conf) {
+ super(snapshotAccessor, fs, conf);
+ }
+
+ protected CharArrayRecord record = new CharArrayRecord();
+ protected Text value = new Text();
+ protected CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+ protected ByteBuffer reusableByteBuffer = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ protected CharBuffer reusableCharBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+
+ @Override
+ public Class<?> getRecordClass() throws IOException {
+ return char[].class;
+ }
+
+ @Override
+ protected IRawRecord<char[]> lookup(RecordId rid) throws IOException {
+ record.reset();
+ readRecord(rid);
+ writeRecord();
+ return record;
+ }
+
+ protected abstract void readRecord(RecordId rid) throws IOException;
+
+ private void writeRecord() {
+ reusableByteBuffer.clear();
+ if (reusableByteBuffer.remaining() < value.getLength()) {
+ reusableByteBuffer = ByteBuffer
+ .allocateDirect(value.getLength() + ExternalDataConstants.DEFAULT_BUFFER_INCREMENT);
+ }
+ reusableByteBuffer.put(value.getBytes(), 0, value.getLength());
+ reusableByteBuffer.flip();
+ while (reusableByteBuffer.hasRemaining()) {
+ reusableCharBuffer.clear();
+ decoder.decode(reusableByteBuffer, reusableCharBuffer, false);
+ reusableCharBuffer.flip();
+ record.append(reusableCharBuffer);
+ }
+ record.endRecord();
+ }
+}