You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:15 UTC
[17/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
deleted file mode 100644
index 64c62f7..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * Factory class for creating an instance of NCFileSystemAdapter. An
- * NCFileSystemAdapter reads external data residing on the local file system of
- * an NC.
- */
-public class NCFileSystemAdapter extends FileSystemBasedAdapter {
-
- private static final long serialVersionUID = 1L;
-
- private final FileSplit[] fileSplits;
-
- public NCFileSystemAdapter(FileSplit[] fileSplits, ITupleParserFactory parserFactory, IAType atype,
- IHyracksTaskContext ctx) throws HyracksDataException {
- super(parserFactory, atype, ctx);
- this.fileSplits = fileSplits;
- }
-
- @Override
- public InputStream getInputStream(int partition) throws IOException {
- FileSplit split = fileSplits[partition];
- File inputFile = split.getLocalFile().getFile();
- InputStream in;
- try {
- in = new FileInputStream(inputFile);
- return in;
- } catch (FileNotFoundException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public String getFilename(int partition) {
- final FileSplit fileSplit = fileSplits[partition];
- return fileSplit.getNodeName() + ":" + fileSplit.getLocalFile().getFile().getPath();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java
deleted file mode 100644
index d6b4ba7..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-import org.apache.asterix.external.feeds.IPullBasedFeedAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * Acts as an abstract class for all pull-based external data adapters. Captures
- * the common logic for obtaining bytes from an external source and packing them
- * into frames as tuples.
- */
-public abstract class PullBasedAdapter implements IPullBasedFeedAdapter {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(PullBasedAdapter.class.getName());
- private static final int timeout = 5; // seconds
-
- protected ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
- protected IPullBasedFeedClient pullBasedFeedClient;
- protected ARecordType adapterOutputType;
- protected boolean continueIngestion = true;
- protected Map<String, String> configuration;
-
- private FrameTupleAppender appender;
- private IFrame frame;
- private long tupleCount = 0;
- private final IHyracksTaskContext ctx;
- private int frameTupleCount = 0;
-
- protected FeedPolicyEnforcer policyEnforcer;
-
- public FeedPolicyEnforcer getPolicyEnforcer() {
- return policyEnforcer;
- }
-
- public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
- this.policyEnforcer = policyEnforcer;
- }
-
- public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
-
- public PullBasedAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) {
- this.ctx = ctx;
- this.configuration = configuration;
- }
-
- public long getIngestedRecordsCount() {
- return tupleCount;
- }
-
- @Override
- public void start(int partition, IFrameWriter writer) throws Exception {
- frame = new VSizeFrame(ctx);
- appender = new FrameTupleAppender(frame);
-
- pullBasedFeedClient = getFeedClient(partition);
- InflowState inflowState = null;
-
- while (continueIngestion) {
- tupleBuilder.reset();
- try {
- // blocking call
- inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput(), timeout);
- switch (inflowState) {
- case DATA_AVAILABLE:
- tupleBuilder.addFieldEndOffset();
- appendTupleToFrame(writer);
- frameTupleCount++;
- break;
- case NO_MORE_DATA:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Reached end of feed");
- }
- appender.flush(writer, true);
- tupleCount += frameTupleCount;
- frameTupleCount = 0;
- continueIngestion = false;
- break;
- case DATA_NOT_AVAILABLE:
- if (frameTupleCount > 0) {
- appender.flush(writer, true);
- tupleCount += frameTupleCount;
- frameTupleCount = 0;
- }
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Timed out on obtaining data from pull based adapter. Trying again!");
- }
- break;
- }
-
- } catch (Exception failureException) {
- try {
- failureException.printStackTrace();
- boolean continueIngestion = policyEnforcer.continueIngestionPostSoftwareFailure(failureException);
- if (continueIngestion) {
- tupleBuilder.reset();
- continue;
- } else {
- throw failureException;
- }
- } catch (Exception recoveryException) {
- throw new Exception(recoveryException);
- }
- }
- }
- }
-
- private void appendTupleToFrame(IFrameWriter writer) throws HyracksDataException {
- if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- appender.flush(writer, true);
- if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
-
- /**
- * Discontinue the ingestion of data and end the feed.
- *
- * @throws Exception
- */
- public void stop() throws Exception {
- continueIngestion = false;
- }
-
- public Map<String, String> getConfiguration() {
- return configuration;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
deleted file mode 100644
index 985399b..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
-import com.microsoft.windowsazure.services.table.client.CloudTableClient;
-import com.microsoft.windowsazure.services.table.client.TableConstants;
-import com.microsoft.windowsazure.services.table.client.TableQuery;
-import com.microsoft.windowsazure.services.table.client.TableQuery.Operators;
-import com.microsoft.windowsazure.services.table.client.TableQuery.QueryComparisons;
-import com.microsoft.windowsazure.services.table.client.TableServiceEntity;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.library.java.JObjects.ByteArrayAccessibleInputStream;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.util.ResettableByteArrayOutputStream;
-import org.apache.asterix.runtime.operators.file.ADMDataParser;
-
-public class PullBasedAzureFeedClient implements IPullBasedFeedClient {
- private static final Logger LOGGER = Logger.getLogger(PullBasedAzureFeedClient.class.getName());
-
- private final String tableName;
- private final ARecordType outputType;
- private final CloudTableClient ctc;
- private final TableQuery<? extends TableServiceEntity> tableQuery;
- private Iterator<? extends TableServiceEntity> entityIt;
-
- private final Pattern arrayPattern = Pattern.compile("\\[(?<vals>.*)\\]");
- private final Pattern int32Pattern = Pattern.compile(":(?<int>\\d+)(,|})");
- private final Pattern doubleWithEndingZeroPattern = Pattern.compile("\\d+\\.(?<zero>0)(,|})");
-
- private final ResettableByteArrayOutputStream rbaos;
- private final DataOutputStream dos;
- private final ADMDataParser adp;
- private final ByteArrayAccessibleInputStream baais;
-
- public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType, String tableName, String lowKey,
- String highKey) throws AsterixException {
- this.tableName = tableName;
- this.outputType = outputType;
- this.tableQuery = configureTableQuery(tableName, lowKey, highKey);
- this.ctc = csa.createCloudTableClient();
- rbaos = new ResettableByteArrayOutputStream();
- dos = new DataOutputStream(rbaos);
- baais = new ByteArrayAccessibleInputStream(rbaos.getByteArray(), 0, 0);
- adp = new ADMDataParser();
- adp.initialize(baais, outputType, false);
- }
-
- private TableQuery<? extends TableServiceEntity> configureTableQuery(String tableName, String lowKey, String highKey) {
- TableQuery<? extends TableServiceEntity> baseTQ = TableQuery.from(tableName, classFromString(tableName));
- if (lowKey != null && highKey != null) {
- String lowKeyPredicate = TableQuery.generateFilterCondition(TableConstants.PARTITION_KEY,
- QueryComparisons.GREATER_THAN_OR_EQUAL, lowKey);
- String highKeyPredicate = TableQuery.generateFilterCondition(TableConstants.PARTITION_KEY,
- QueryComparisons.LESS_THAN_OR_EQUAL, highKey);
- String partitionPredicate = TableQuery.combineFilters(lowKeyPredicate, Operators.AND, highKeyPredicate);
- return baseTQ.where(partitionPredicate);
- }
-
- return baseTQ;
- }
-
- private Class<? extends TableServiceEntity> classFromString(String tableName) {
- return tableName.equals("Postings") ? AzureTweetEntity.class : AzureTweetMetadataEntity.class;
- }
-
- @Override
- public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException {
- if (entityIt == null) {
- entityIt = ctc.execute(tableQuery).iterator();
- }
-
- boolean moreTweets = entityIt.hasNext();
- if (moreTweets) {
- String json = null;
- try {
- json = getJSONString();
- byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
- rbaos.reset();
- dos.write(jsonBytes, 0, jsonBytes.length);
- dos.flush();
- baais.setContent(rbaos.getByteArray(), 0, jsonBytes.length);
- adp.initialize(baais, outputType, false);
- adp.parse(dataOutput);
- } catch (Exception e) {
- if (json != null) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Record in error: " + json);
- }
- }
- e.printStackTrace();
- throw new AsterixException(e);
- }
- }
- return moreTweets ? InflowState.DATA_AVAILABLE : InflowState.NO_MORE_DATA;
- }
-
- private String getJSONString() throws JSONException {
- if (tableName.equals("Postings")) {
- AzureTweetEntity tweet = (AzureTweetEntity) entityIt.next();
- JSONObject tjo = new JSONObject(tweet.getJSON().toString());
- tjo.put("posting_id", tweet.getRowKey());
- tjo.put("user_id", tweet.getPartitionKey());
- tjo.remove("id");
- JSONObject utjo = tjo.getJSONObject("user");
- utjo.remove("id");
- tjo.put("user", utjo);
- return tjo.toString();
- } else if (tableName.equals("PostingMetadata")) {
- AzureTweetMetadataEntity tweetMD = (AzureTweetMetadataEntity) entityIt.next();
- JSONObject tmdjo = new JSONObject();
- tmdjo.put("posting_id", tweetMD.getRowKey());
- tmdjo.put("user_id", tweetMD.getPartitionKey());
- tmdjo.put("created_at", stripTillColon(tweetMD.getCreationTimestamp()).replaceAll("\"", ""));
- tmdjo.put("posting_type", stripTillColon(tweetMD.getPostingType()));
- List<Integer> productIdList = Arrays.asList(extractArray(tweetMD.getProductId()));
- tmdjo.put("product_id", productIdList);
- if (tweetMD.getEthnicity() != null) {
- tmdjo.put("ethnicity", new JSONObject(stripTillColon(tweetMD.getEthnicity())));
- }
- if (tweetMD.getGender() != null) {
- tmdjo.put("gender", new JSONObject(stripTillColon(tweetMD.getGender())));
- }
- if (tweetMD.getLocation() != null) {
- String locStr = stripTillColon(tweetMD.getLocation());
- Matcher m = int32Pattern.matcher(locStr);
- while (m.find()) {
- locStr = locStr.replace(m.group("int"), m.group("int") + ".01");
- }
- m = doubleWithEndingZeroPattern.matcher(locStr);
- while (m.find()) {
- locStr = locStr.replace(m.group("zero"), "01");
- }
- tmdjo.put("location", new JSONObject(locStr));
- }
- if (tweetMD.getSentiment() != null) {
- tmdjo.put("sentiment", stripTillColon(tweetMD.getSentiment()));
- }
- return tmdjo.toString();
- } else {
- throw new IllegalArgumentException();
- }
- }
-
- private String stripTillColon(String str) {
- return str.substring(str.indexOf(':') + 1);
- }
-
- private Integer[] extractArray(String str) {
- Matcher m = arrayPattern.matcher(str);
- m.find();
- String[] stringNums = m.group("vals").replaceAll("\\s", "").split(",");
- Integer[] nums = new Integer[stringNums.length];
- for (int i = 0; i < nums.length; ++i) {
- nums[i] = Integer.parseInt(stringNums[i]);
- }
- return nums;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
deleted file mode 100644
index e8cacde..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class PullBasedAzureTwitterAdapter extends PullBasedAdapter implements IDatasourceAdapter {
- private static final Logger LOGGER = Logger.getLogger(PullBasedAzureTwitterAdapter.class.getName());
-
- private static final long serialVersionUID = 1L;
-
- private final CloudStorageAccount csa;
- private final String connectionString;
- private final String azureAccountName;
- private final String azureAccountKey;
- private final ARecordType outputType;
- private final String tableName;
- private final boolean partitioned;
-
- private String[] lowKeys;
- private String[] highKeys;
-
- public PullBasedAzureTwitterAdapter(String accountName, String accountKey, String tableName, String[] partitions,
- Map<String, String> configuration, IHyracksTaskContext ctx, ARecordType outputType) throws AsterixException {
- super(configuration, ctx);
- this.outputType = outputType;
- if (partitions != null) {
- partitioned = true;
- configurePartitions(partitions);
- } else {
- partitioned = false;
- }
- this.azureAccountName = accountName;
- this.azureAccountKey = accountKey;
- this.tableName = tableName;
-
- connectionString = "DefaultEndpointsProtocol=http;" + "AccountName=" + azureAccountName + ";AccountKey="
- + azureAccountKey + ";";
- try {
- csa = CloudStorageAccount.parse(connectionString);
- } catch (InvalidKeyException | URISyntaxException e) {
- throw new AsterixException("You must specify a valid Azure account name and key", e);
- }
- }
-
- private void configurePartitions(String[] partitions) {
- lowKeys = new String[partitions.length];
- highKeys = new String[partitions.length];
- for (int i = 0; i < partitions.length; ++i) {
- String[] loHi = partitions[i].split(":");
- lowKeys[i] = loHi[0];
- highKeys[i] = loHi[1];
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Partition " + i + " configured for keys " + lowKeys[i] + " to " + highKeys[i]);
- }
- }
- }
-
- @Override
- public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
- if (partitioned) {
- return new PullBasedAzureFeedClient(csa, outputType, tableName, lowKeys[partition], highKeys[partition]);
- }
- return new PullBasedAzureFeedClient(csa, outputType, tableName, null, null);
- }
-
- @Override
- public DataExchangeMode getDataExchangeMode() {
- return DataExchangeMode.PULL;
- }
-
- @Override
- public boolean handleException(Exception e) {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
deleted file mode 100644
index 90281b7..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * An adapter that provides the functionality of receiving tweets from the
- * Twitter service in the form of ADM formatted records.
- */
-public class PullBasedTwitterAdapter extends ClientBasedFeedAdapter implements IFeedAdapter {
-
- private static final long serialVersionUID = 1L;
-
- private static final int DEFAULT_BATCH_SIZE = 5;
-
- private ARecordType recordType;
- private PullBasedTwitterFeedClient tweetClient;
-
- @Override
- public IFeedClient getFeedClient(int partition) {
- return tweetClient;
- }
-
- public PullBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
- throws AsterixException {
- super(configuration, ctx);
- tweetClient = new PullBasedTwitterFeedClient(ctx, recordType, this);
- }
-
- public ARecordType getAdapterOutputType() {
- return recordType;
- }
-
- @Override
- public DataExchangeMode getDataExchangeMode() {
- return DataExchangeMode.PULL;
- }
-
- @Override
- public boolean handleException(Exception e) {
- return true;
- }
-
- @Override
- public ITupleForwardPolicy getTupleParserPolicy() {
- configuration.put(ITupleForwardPolicy.PARSER_POLICY,
- ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
- String propValue = configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
- if (propValue == null) {
- configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" + DEFAULT_BATCH_SIZE);
- }
- return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
deleted file mode 100644
index 8b5e1e1..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.List;
-import java.util.Map;
-
-import twitter4j.Query;
-import twitter4j.QueryResult;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.external.util.TweetProcessor;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * An implementation of @see {PullBasedFeedClient} for the Twitter service. The
- * feed client fetches data from Twitter service by sending request at regular
- * (configurable) interval.
- */
-public class PullBasedTwitterFeedClient extends FeedClient {
-
- private String keywords;
- private Query query;
- private Twitter twitter;
- private int requestInterval = 5; // seconds
- private QueryResult result;
-
- private ARecordType recordType;
- private int nextTweetIndex = 0;
- private long lastTweetIdReceived = 0;
- private TweetProcessor tweetProcessor;
-
- public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PullBasedTwitterAdapter adapter) {
- this.twitter = TwitterUtil.getTwitterService(adapter.getConfiguration());
- this.recordType = recordType;
- this.tweetProcessor = new TweetProcessor(recordType);
- this.recordSerDe = new ARecordSerializerDeserializer(recordType);
- this.mutableRecord = tweetProcessor.getMutableRecord();
- this.initialize(adapter.getConfiguration());
- }
-
- public ARecordType getRecordType() {
- return recordType;
- }
-
- @Override
- public InflowState retrieveNextRecord() throws Exception {
- Status tweet;
- tweet = getNextTweet();
- if (tweet == null) {
- return InflowState.DATA_NOT_AVAILABLE;
- }
-
- tweetProcessor.processNextTweet(tweet);
- return InflowState.DATA_AVAILABLE;
- }
-
- private void initialize(Map<String, String> params) {
- this.keywords = (String) params.get(SearchAPIConstants.QUERY);
- this.requestInterval = Integer.parseInt((String) params.get(SearchAPIConstants.INTERVAL));
- this.query = new Query(keywords);
- this.query.setCount(100);
- }
-
- private Status getNextTweet() throws TwitterException, InterruptedException {
- if (result == null || nextTweetIndex >= result.getTweets().size()) {
- Thread.sleep(1000 * requestInterval);
- query.setSinceId(lastTweetIdReceived);
- result = twitter.search(query);
- nextTweetIndex = 0;
- }
- if (result != null && !result.getTweets().isEmpty()) {
- List<Status> tw = result.getTweets();
- Status tweet = tw.get(nextTweetIndex++);
- if (lastTweetIdReceived < tweet.getId()) {
- lastTweetIdReceived = tweet.getId();
- }
- return tweet;
- } else {
- return null;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
deleted file mode 100644
index 01839d3..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class PushBasedTwitterAdapter extends ClientBasedFeedAdapter {
-
- private static final long serialVersionUID = 1L;
-
- private static final int DEFAULT_BATCH_SIZE = 50;
-
- private PushBasedTwitterFeedClient tweetClient;
-
- public PushBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx) throws AsterixException {
- super(configuration, ctx);
- this.configuration = configuration;
- this.tweetClient = new PushBasedTwitterFeedClient(ctx, recordType, this);
- }
-
- @Override
- public DataExchangeMode getDataExchangeMode() {
- return DataExchangeMode.PUSH;
- }
-
- @Override
- public boolean handleException(Exception e) {
- return true;
- }
-
- @Override
- public IFeedClient getFeedClient(int partition) throws Exception {
- return tweetClient;
- }
-
- @Override
- public ITupleForwardPolicy getTupleParserPolicy() {
- configuration.put(ITupleForwardPolicy.PARSER_POLICY,
- ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
- String propValue = configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
- if (propValue == null) {
- configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" + DEFAULT_BATCH_SIZE);
- }
- return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
deleted file mode 100644
index bb40ac9..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.external.util.TweetProcessor;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterStream;
-
-/**
- * An implementation of @see {PullBasedFeedClient} for the Twitter service. The
- * feed client fetches data from Twitter service by sending request at regular
- * (configurable) interval.
- */
-public class PushBasedTwitterFeedClient extends FeedClient {
-
- private ARecordType recordType;
- private TweetProcessor tweetProcessor;
- private LinkedBlockingQueue<Status> inputQ;
-
- public PushBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PushBasedTwitterAdapter adapter)
- throws AsterixException {
- this.recordType = recordType;
- this.tweetProcessor = new TweetProcessor(recordType);
- this.recordSerDe = new ARecordSerializerDeserializer(recordType);
- this.mutableRecord = tweetProcessor.getMutableRecord();
- this.inputQ = new LinkedBlockingQueue<Status>();
- TwitterStream twitterStream = TwitterUtil.getTwitterStream(adapter.getConfiguration());
- twitterStream.addListener(new TweetListener(inputQ));
- FilterQuery query = TwitterUtil.getFilterQuery(adapter.getConfiguration());
- if (query != null) {
- twitterStream.filter(query);
- } else {
- twitterStream.sample();
- }
- }
-
- public ARecordType getRecordType() {
- return recordType;
- }
-
- private class TweetListener implements StatusListener {
-
- private LinkedBlockingQueue<Status> inputQ;
-
- public TweetListener(LinkedBlockingQueue<Status> inputQ) {
- this.inputQ = inputQ;
- }
-
- @Override
- public void onStatus(Status tweet) {
- inputQ.add(tweet);
- }
-
- @Override
- public void onException(Exception arg0) {
-
- }
-
- @Override
- public void onDeletionNotice(StatusDeletionNotice arg0) {
- }
-
- @Override
- public void onScrubGeo(long arg0, long arg1) {
- }
-
- @Override
- public void onStallWarning(StallWarning arg0) {
- }
-
- @Override
- public void onTrackLimitationNotice(int arg0) {
- }
- }
-
- @Override
- public InflowState retrieveNextRecord() throws Exception {
- Status tweet = inputQ.take();
- tweetProcessor.processNextTweet(tweet);
- return InflowState.DATA_AVAILABLE;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedAdapter.java
deleted file mode 100644
index 69cd82c..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
- */
-public class RSSFeedAdapter extends ClientBasedFeedAdapter implements IFeedAdapter {
-
- private static final long serialVersionUID = 1L;
-
- private static final String KEY_RSS_URL = "url";
-
- private List<String> feedURLs = new ArrayList<String>();
- private String id_prefix = "";
-
- private IFeedClient rssFeedClient;
-
- private ARecordType recordType;
-
- public RSSFeedAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
- throws AsterixException {
- super(configuration, ctx);
- id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
- this.recordType = recordType;
- reconfigure(configuration);
- }
-
- private void initializeFeedURLs(String rssURLProperty) {
- feedURLs.clear();
- String[] feedURLProperty = rssURLProperty.split(",");
- for (String feedURL : feedURLProperty) {
- feedURLs.add(feedURL);
- }
- }
-
- protected void reconfigure(Map<String, String> arguments) {
- String rssURLProperty = configuration.get(KEY_RSS_URL);
- if (rssURLProperty != null) {
- initializeFeedURLs(rssURLProperty);
- }
- }
-
- @Override
- public IFeedClient getFeedClient(int partition) throws Exception {
- if (rssFeedClient == null) {
- rssFeedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
- }
- return rssFeedClient;
- }
-
- public ARecordType getRecordType() {
- return recordType;
- }
-
- @Override
- public DataExchangeMode getDataExchangeMode() {
- return DataExchangeMode.PULL;
- }
-
- @Override
- public boolean handleException(Exception e) {
- return false;
- }
-
- @Override
- public ITupleForwardPolicy getTupleParserPolicy() {
- return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedClient.java
deleted file mode 100644
index 0b0d0fb..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/RSSFeedClient.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-import com.sun.syndication.feed.synd.SyndEntryImpl;
-import com.sun.syndication.feed.synd.SyndFeed;
-import com.sun.syndication.fetcher.FeedFetcher;
-import com.sun.syndication.fetcher.FetcherEvent;
-import com.sun.syndication.fetcher.FetcherListener;
-import com.sun.syndication.fetcher.impl.FeedFetcherCache;
-import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
-import com.sun.syndication.fetcher.impl.HttpURLFeedFetcher;
-
-import org.apache.asterix.om.base.AMutableRecord;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.ARecordType;
-
-/**
- * An implementation of @see {PullBasedFeedClient} responsible for
- * fetching from an RSS feed source at regular interval.
- */
-@SuppressWarnings("rawtypes")
-public class RSSFeedClient extends FeedClient {
-
- private long id = 0;
- private String idPrefix;
- private boolean feedModified = false;
-
- private Queue<SyndEntryImpl> rssFeedBuffer = new LinkedList<SyndEntryImpl>();
-
- IAObject[] mutableFields;
-
- private final FeedFetcherCache feedInfoCache;
- private final FeedFetcher fetcher;
- private final FetcherEventListenerImpl listener;
- private final URL feedUrl;
- private ARecordType recordType;
- String[] tupleFieldValues;
-
- public boolean isFeedModified() {
- return feedModified;
- }
-
- public void setFeedModified(boolean feedModified) {
- this.feedModified = feedModified;
- }
-
- public RSSFeedClient(RSSFeedAdapter adapter, String feedURL, String id_prefix) throws MalformedURLException {
- this.idPrefix = id_prefix;
- this.feedUrl = new URL(feedURL);
- feedInfoCache = HashMapFeedInfoCache.getInstance();
- fetcher = new HttpURLFeedFetcher(feedInfoCache);
- listener = new FetcherEventListenerImpl(this);
- fetcher.addFetcherEventListener(listener);
- mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
- new AMutableString(null) };
- recordType = adapter.getRecordType();
- mutableRecord = new AMutableRecord(recordType, mutableFields);
- tupleFieldValues = new String[recordType.getFieldNames().length];
- }
-
- @Override
- public InflowState retrieveNextRecord() throws Exception {
- SyndEntryImpl feedEntry = getNextRSSFeed();
- if (feedEntry == null) {
- return InflowState.DATA_NOT_AVAILABLE;
- }
- tupleFieldValues[0] = idPrefix + ":" + id;
- tupleFieldValues[1] = feedEntry.getTitle();
- tupleFieldValues[2] = feedEntry.getDescription().getValue();
- tupleFieldValues[3] = feedEntry.getLink();
- int numFields = recordType.getFieldNames().length;
- for (int i = 0; i < numFields; i++) {
- ((AMutableString) mutableFields[i]).setValue(tupleFieldValues[i]);
- mutableRecord.setValueAtPos(i, mutableFields[i]);
- }
- id++;
- return InflowState.DATA_AVAILABLE;
- }
-
- private SyndEntryImpl getNextRSSFeed() throws Exception {
- if (rssFeedBuffer.isEmpty()) {
- fetchFeed();
- }
- if (rssFeedBuffer.isEmpty()) {
- return null;
- } else {
- return rssFeedBuffer.remove();
- }
- }
-
- @SuppressWarnings("unchecked")
- private void fetchFeed() {
- try {
- // Retrieve the feed.
- // We will get a Feed Polled Event and then a
- // Feed Retrieved event (assuming the feed is valid)
- SyndFeed feed = fetcher.retrieveFeed(feedUrl);
- if (feedModified) {
- System.err.println(feedUrl + " retrieved");
- System.err.println(feedUrl + " has a title: " + feed.getTitle() + " and contains "
- + feed.getEntries().size() + " entries.");
-
- List fetchedFeeds = feed.getEntries();
- rssFeedBuffer.addAll(fetchedFeeds);
- }
- } catch (Exception ex) {
- System.out.println("ERROR: " + ex.getMessage());
- ex.printStackTrace();
- }
- }
-
-}
-
-class FetcherEventListenerImpl implements FetcherListener {
-
- private final IFeedClient feedClient;
-
- public FetcherEventListenerImpl(IFeedClient feedClient) {
- this.feedClient = feedClient;
- }
-
- /**
- * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
- */
- public void fetcherEvent(FetcherEvent event) {
- String eventType = event.getEventType();
- if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
- System.err.println("\tEVENT: Feed Polled. URL = " + event.getUrlString());
- } else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
- System.err.println("\tEVENT: Feed Retrieved. URL = " + event.getUrlString());
- ((RSSFeedClient) feedClient).setFeedModified(true);
- } else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
- System.err.println("\tEVENT: Feed Unchanged. URL = " + event.getUrlString());
- ((RSSFeedClient) feedClient).setFeedModified(true);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java
index b436177..3f10dc4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -23,7 +23,7 @@ import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -31,7 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.file.ITupleParser;
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-public abstract class StreamBasedAdapter implements IDatasourceAdapter {
+public abstract class StreamBasedAdapter implements IDataSourceAdapter {
private static final long serialVersionUID = 1L;
@@ -43,8 +43,8 @@ public abstract class StreamBasedAdapter implements IDatasourceAdapter {
protected final IAType sourceDatatype;
- public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
+ public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx,
+ int partition) throws HyracksDataException {
this.tupleParser = parserFactory.createTupleParser(ctx);
this.sourceDatatype = sourceDatatype;
}
@@ -56,7 +56,8 @@ public abstract class StreamBasedAdapter implements IDatasourceAdapter {
tupleParser.parse(in, writer);
} else {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Could not obtain input stream for parsing from adapter " + this + "[" + partition + "]");
+ LOGGER.warning(
+ "Could not obtain input stream for parsing from adapter " + this + "[" + partition + "]");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IPullBasedFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IPullBasedFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IPullBasedFeedAdapter.java
deleted file mode 100644
index 62052af..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IPullBasedFeedAdapter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-
-public interface IPullBasedFeedAdapter extends IFeedAdapter {
-
- /**
- * @return
- */
- public FeedPolicyEnforcer getPolicyEnforcer();
-
- /**
- * @param feedPolicyEnforcer
- */
- public void setFeedPolicyEnforcer(FeedPolicyEnforcer feedPolicyEnforcer);
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
index 3988f1a..533d119 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
@@ -40,6 +40,16 @@ public class ExternalFile implements Serializable, Comparable<ExternalFile> {
private int fileNumber;
private ExternalFilePendingOp pendingOp;
+ public ExternalFile() {
+ this.dataverseName = "";
+ this.datasetName = "";
+ this.fileNumber = -1;
+ this.fileName = "";
+ this.lastModefiedTime = new Date();
+ this.size = 0;
+ this.pendingOp = ExternalFilePendingOp.PENDING_NO_OP;
+ }
+
public ExternalFile(String dataverseName, String datasetName, int fileNumber, String fileName,
Date lastModefiedTime, long size, ExternalFilePendingOp pendingOp) {
this.dataverseName = dataverseName;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
index b10379b..d94db08 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
@@ -24,7 +24,7 @@ import java.io.DataInputStream;
import java.io.Serializable;
import java.util.Date;
-import org.apache.asterix.external.indexing.operators.ExternalLoopkupOperatorDiscriptor;
+import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.AInt64;
@@ -57,7 +57,7 @@ public class ExternalFileIndexAccessor implements Serializable {
private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
private static final long serialVersionUID = 1L;
private ExternalBTreeDataflowHelper indexDataflowHelper;
- private ExternalLoopkupOperatorDiscriptor opDesc;
+ private ExternalLookupOperatorDescriptor opDesc;
private IHyracksTaskContext ctx;
private ExternalBTree index;
@@ -72,39 +72,34 @@ public class ExternalFileIndexAccessor implements Serializable {
private IIndexCursor fileIndexSearchCursor;
public ExternalFileIndexAccessor(ExternalBTreeDataflowHelper indexDataflowHelper,
- ExternalLoopkupOperatorDiscriptor opDesc) {
+ ExternalLookupOperatorDescriptor opDesc) {
this.indexDataflowHelper = indexDataflowHelper;
this.opDesc = opDesc;
}
- public void openIndex() throws HyracksDataException {
+ public void open() throws HyracksDataException {
// Open the index and get the instance
indexDataflowHelper.open();
index = (ExternalBTree) indexDataflowHelper.getIndexInstance();
- try {
- // Create search key and search predicate objects
- searchKey = new ArrayTupleReference();
- searchKeyTupleBuilder = new ArrayTupleBuilder(FilesIndexDescription.FILE_KEY_SIZE);
- searchKeyTupleBuilder.reset();
- searchKeyTupleBuilder.addField(intSerde, currentFileNumber);
- searchKey.reset(searchKeyTupleBuilder.getFieldEndOffsets(), searchKeyTupleBuilder.getByteArray());
- searchCmp = BTreeUtils.getSearchMultiComparator(index.getComparatorFactories(), searchKey);
- searchPredicate = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp);
+ // Create search key and search predicate objects
+ searchKey = new ArrayTupleReference();
+ searchKeyTupleBuilder = new ArrayTupleBuilder(FilesIndexDescription.FILE_KEY_SIZE);
+ searchKeyTupleBuilder.reset();
+ searchKeyTupleBuilder.addField(intSerde, currentFileNumber);
+ searchKey.reset(searchKeyTupleBuilder.getFieldEndOffsets(), searchKeyTupleBuilder.getByteArray());
+ searchCmp = BTreeUtils.getSearchMultiComparator(index.getComparatorFactories(), searchKey);
+ searchPredicate = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp);
- // create the accessor and the cursor using the passed version
- ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
- .createSearchOperationCallback(indexDataflowHelper.getResourceID(), ctx);
- fileIndexAccessor = index.createAccessor(searchCallback, indexDataflowHelper.getVersion());
- fileIndexSearchCursor = fileIndexAccessor.createSearchCursor(false);
- } catch (Exception e) {
- indexDataflowHelper.close();
- throw new HyracksDataException(e);
- }
+ // create the accessor and the cursor using the passed version
+ ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
+ .createSearchOperationCallback(indexDataflowHelper.getResourceID(), ctx);
+ fileIndexAccessor = index.createAccessor(searchCallback, indexDataflowHelper.getVersion());
+ fileIndexSearchCursor = fileIndexAccessor.createSearchCursor(false);
}
- public void searchForFile(int fileNumber, ExternalFile file) throws Exception {
+ public void lookup(int fileId, ExternalFile file) throws Exception {
// Set search parameters
- currentFileNumber.setValue(fileNumber);
+ currentFileNumber.setValue(fileId);
searchKeyTupleBuilder.reset();
searchKeyTupleBuilder.addField(intSerde, currentFileNumber);
searchKey.reset(searchKeyTupleBuilder.getFieldEndOffsets(), searchKeyTupleBuilder.getByteArray());
@@ -122,14 +117,14 @@ public class ExternalFileIndexAccessor implements Serializable {
ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
DataInput in = new DataInputStream(stream);
ARecord externalFileRecord = (ARecord) filesIndexDescription.EXTERNAL_FILE_RECORD_SERDE.deserialize(in);
- setExternalFileFromARecord(externalFileRecord, file);
+ setFile(externalFileRecord, file);
} else {
// This should never happen
throw new HyracksDataException("Was not able to find a file in the files index");
}
}
- private void setExternalFileFromARecord(ARecord externalFileRecord, ExternalFile file) {
+ private void setFile(ARecord externalFileRecord, ExternalFile file) {
file.setFileName(
((AString) externalFileRecord.getValueByPos(FilesIndexDescription.EXTERNAL_FILE_NAME_FIELD_INDEX))
.getStringValue());
@@ -140,11 +135,13 @@ public class ExternalFileIndexAccessor implements Serializable {
.getChrononTime())));
}
- public void closeIndex() throws HyracksDataException {
- try {
- fileIndexSearchCursor.close();
- } finally {
- indexDataflowHelper.close();
+ public void close() throws HyracksDataException {
+ if (index != null) {
+ try {
+ fileIndexSearchCursor.close();
+ } finally {
+ indexDataflowHelper.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java
new file mode 100644
index 0000000..fa22179
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileIndexTupleTranslator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.io.IOException;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+@SuppressWarnings("unchecked")
+public class FileIndexTupleTranslator {
+ private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
+ private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
+ filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFieldCount());
+ private RecordBuilder recordBuilder = new RecordBuilder();
+ private ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableString aString = new AMutableString(null);
+ private AMutableDateTime aDateTime = new AMutableDateTime(0);
+ private ISerializerDeserializer<IAObject> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+ private ISerializerDeserializer<IAObject> dateTimeSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADATETIME);
+ private ISerializerDeserializer<IAObject> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ private ArrayTupleReference tuple = new ArrayTupleReference();
+
+ public ITupleReference getTupleFromFile(ExternalFile file) throws IOException, AsterixException {
+ tupleBuilder.reset();
+ //File Number
+ aInt32.setValue(file.getFileNumber());
+ filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFields()[0].serialize(aInt32,
+ tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ //File Record
+ recordBuilder.reset(filesIndexDescription.EXTERNAL_FILE_RECORD_TYPE);
+ // write field 0 (File Name)
+ fieldValue.reset();
+ aString.setValue(file.getFileName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(0, fieldValue);
+
+ //write field 1 (File Size)
+ fieldValue.reset();
+ aInt64.setValue(file.getSize());
+ longSerde.serialize(aInt64, fieldValue.getDataOutput());
+ recordBuilder.addField(1, fieldValue);
+
+ //write field 2 (File Mod Date)
+ fieldValue.reset();
+ aDateTime.setValue(file.getLastModefiedTime().getTime());
+ dateTimeSerde.serialize(aDateTime, fieldValue.getDataOutput());
+ recordBuilder.addField(2, fieldValue);
+
+ //write the record
+ recordBuilder.write(tupleBuilder.getDataOutput(), true);
+ tupleBuilder.addFieldEndOffset();
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ return tuple;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
new file mode 100644
index 0000000..932aece
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class FileOffsetIndexer implements IExternalIndexer {
+
+ private static final long serialVersionUID = 1L;
+ public static final int NUM_OF_FIELDS = 2;
+ protected AMutableInt32 fileNumber = new AMutableInt32(0);
+ protected AMutableInt64 offset = new AMutableInt64(0);
+ protected RecordReader<?, Writable> recordReader;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<IAObject> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<IAObject> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+
+ @Override
+ public void reset(IRecordReader<?> reader) throws IOException {
+ //TODO: Make it more generic since we can't assume it is always going to be HDFS records.
+ @SuppressWarnings("unchecked")
+ HDFSRecordReader<?, Writable> hdfsReader = (HDFSRecordReader<?, Writable>) reader;
+ fileNumber.setValue(hdfsReader.getSnapshot().get(hdfsReader.getCurrentSplitIndex()).getFileNumber());
+ recordReader = hdfsReader.getReader();
+ offset.setValue(recordReader.getPos());
+ }
+
+ @Override
+ public void index(ArrayTupleBuilder tb) throws IOException {
+ tb.addField(intSerde, fileNumber);
+ tb.addField(longSerde, offset);
+ // Get position for next index(tb) call
+ offset.setValue(recordReader.getPos());
+ }
+
+ @Override
+ public int getNumberOfFields() {
+ return NUM_OF_FIELDS;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
new file mode 100644
index 0000000..870a6df
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.hdfs.scheduler.Scheduler;
+
+public class IndexingScheduler {
+ private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
+
+ /** a list of NCs */
+ private String[] NCs;
+
+ /** a map from ip to NCs */
+ private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
+
+ /** a map from the NC name to the index */
+ private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+
+ /** a map from NC name to the NodeControllerInfo */
+ private Map<String, NodeControllerInfo> ncNameToNcInfos;
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+ public IndexingScheduler(String ipAddress, int port) throws HyracksException {
+ try {
+ IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ this.ncNameToNcInfos = hcc.getNodeControllerInfos();
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ /**
+ * Set location constraints for a file scan operator with a list of file
+ * splits. It tries to assign splits to their local machines fairly
+ * Locality is more important than fairness
+ *
+ * @throws HyracksDataException
+ */
+ public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
+ if (splits == null) {
+ /** deal the case when the splits array is null */
+ return new String[] {};
+ }
+ int[] workloads = new int[NCs.length];
+ Arrays.fill(workloads, 0);
+ String[] locations = new String[splits.length];
+ Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
+ /**
+ * upper bound is number of splits
+ */
+ int upperBoundSlots = splits.length;
+
+ try {
+ Random random = new Random(System.currentTimeMillis());
+ boolean scheduled[] = new boolean[splits.length];
+ Arrays.fill(scheduled, false);
+ /**
+ * scan the splits and build the popularity map
+ * give the machines with less local splits more scheduling priority
+ */
+ buildPopularityMap(splits, locationToNumOfSplits);
+ HashMap<String, Integer> locationToNumOfAssignement = new HashMap<String, Integer>();
+ for (String location : locationToNumOfSplits.keySet()) {
+ locationToNumOfAssignement.put(location, 0);
+ }
+ /**
+ * push data-local upper-bounds slots to each machine
+ */
+ scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits,
+ locationToNumOfAssignement);
+
+ int dataLocalCount = 0;
+ for (int i = 0; i < scheduled.length; i++) {
+ if (scheduled[i] == true) {
+ dataLocalCount++;
+ }
+ }
+ LOGGER.info("Data local rate: "
+ + (scheduled.length == 0 ? 0.0 : ((float) dataLocalCount / (float) (scheduled.length))));
+ /**
+ * push non-data-local upper-bounds slots to each machine
+ */
+ locationToNumOfAssignement.clear();
+ for (String nc : NCs) {
+ locationToNumOfAssignement.put(nc, 0);
+ }
+ for (int i = 0; i < scheduled.length; i++) {
+ if (scheduled[i]) {
+ locationToNumOfAssignement.put(locations[i], locationToNumOfAssignement.get(locations[i]) + 1);
+ }
+ }
+
+ scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled, locationToNumOfAssignement);
+ return locations;
+ } catch (IOException e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ /**
+ * Schedule non-local slots to each machine
+ *
+ * @param splits
+ * The HDFS file splits.
+ * @param workloads
+ * The current capacity of each machine.
+ * @param locations
+ * The result schedule.
+ * @param slotLimit
+ * The maximum slots of each machine.
+ * @param scheduled
+ * Indicate which slot is scheduled.
+ * @param locationToNumOfAssignement
+ */
+ private void scheduleNonLocalSlots(InputSplit[] splits, final int[] workloads, String[] locations, int slotLimit,
+ boolean[] scheduled, final HashMap<String, Integer> locationToNumOfAssignement)
+ throws IOException, UnknownHostException {
+
+ PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(NCs.length, new Comparator<String>() {
+ @Override
+ public int compare(String s1, String s2) {
+ return locationToNumOfAssignement.get(s1).compareTo(locationToNumOfAssignement.get(s2));
+ }
+
+ });
+
+ for (String nc : NCs) {
+ scheduleCadndiates.add(nc);
+ }
+ /**
+ * schedule no-local file reads
+ */
+ for (int i = 0; i < splits.length; i++) {
+ /** if there is no data-local NC choice, choose a random one */
+ if (!scheduled[i]) {
+ String selectedNcName = scheduleCadndiates.remove();
+ if (selectedNcName != null) {
+ int ncIndex = ncNameToIndex.get(selectedNcName);
+ workloads[ncIndex]++;
+ scheduled[i] = true;
+ locations[i] = selectedNcName;
+ locationToNumOfAssignement.put(selectedNcName, workloads[ncIndex]);
+ scheduleCadndiates.add(selectedNcName);
+ }
+ }
+ }
+ }
+
+ /**
+ * Schedule data-local slots to each machine.
+ *
+ * @param splits
+ * The HDFS file splits.
+ * @param workloads
+ * The current capacity of each machine.
+ * @param locations
+ * The result schedule.
+ * @param slots
+ * The maximum slots of each machine.
+ * @param random
+ * The random generator.
+ * @param scheduled
+ * Indicate which slot is scheduled.
+ * @throws IOException
+ * @throws UnknownHostException
+ */
+ private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
+ boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits,
+ final HashMap<String, Integer> locationToNumOfAssignement) throws IOException, UnknownHostException {
+ /** scheduling candidates will be ordered inversely according to their popularity */
+ PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
+ @Override
+ public int compare(String s1, String s2) {
+ int assignmentDifference = locationToNumOfAssignement.get(s1)
+ .compareTo(locationToNumOfAssignement.get(s2));
+ if (assignmentDifference != 0) {
+ return assignmentDifference;
+ }
+ return locationToNumSplits.get(s1).compareTo(locationToNumSplits.get(s2));
+ }
+
+ });
+
+ for (int i = 0; i < splits.length; i++) {
+ if (scheduled[i]) {
+ continue;
+ }
+ /**
+ * get the location of all the splits
+ */
+ String[] locs = splits[i].getLocations();
+ if (locs.length > 0) {
+ scheduleCadndiates.clear();
+ for (int j = 0; j < locs.length; j++) {
+ scheduleCadndiates.add(locs[j]);
+ }
+
+ for (String candidate : scheduleCadndiates) {
+ /**
+ * get all the IP addresses from the name
+ */
+ InetAddress[] allIps = InetAddress.getAllByName(candidate);
+ /**
+ * iterate overa all ips
+ */
+ for (InetAddress ip : allIps) {
+ /**
+ * if the node controller exists
+ */
+ if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+ /**
+ * set the ncs
+ */
+ List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
+ int arrayPos = random.nextInt(dataLocations.size());
+ String nc = dataLocations.get(arrayPos);
+ int pos = ncNameToIndex.get(nc);
+ /**
+ * check if the node is already full
+ */
+ if (workloads[pos] < slots) {
+ locations[i] = nc;
+ workloads[pos]++;
+ scheduled[i] = true;
+ locationToNumOfAssignement.put(candidate,
+ locationToNumOfAssignement.get(candidate) + 1);
+ break;
+ }
+ }
+ }
+ /**
+ * break the loop for data-locations if the schedule has
+ * already been found
+ */
+ if (scheduled[i] == true) {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Scan the splits once and build a popularity map
+ *
+ * @param splits
+ * the split array
+ * @param locationToNumOfSplits
+ * the map to be built
+ * @throws IOException
+ */
+ private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)
+ throws IOException {
+ for (InputSplit split : splits) {
+ String[] locations = split.getLocations();
+ for (String loc : locations) {
+ IntWritable locCount = locationToNumOfSplits.get(loc);
+ if (locCount == null) {
+ locCount = new IntWritable(0);
+ locationToNumOfSplits.put(loc, locCount);
+ }
+ locCount.set(locCount.get() + 1);
+ }
+ }
+ }
+
+ /**
+ * Load the IP-address-to-NC map from the NCNameToNCInfoMap
+ *
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+ private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+ try {
+ NCs = new String[ncNameToNcInfos.size()];
+ ipToNcMapping.clear();
+ ncNameToIndex.clear();
+ int i = 0;
+
+ /**
+ * build the IP address to NC map
+ */
+ for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
+ String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().lookupIpAddress())
+ .getHostAddress();
+ List<String> matchedNCs = ipToNcMapping.get(ipAddr);
+ if (matchedNCs == null) {
+ matchedNCs = new ArrayList<String>();
+ ipToNcMapping.put(ipAddr, matchedNCs);
+ }
+ matchedNCs.add(entry.getKey());
+ NCs[i] = entry.getKey();
+ i++;
+ }
+
+ /**
+ * set up the NC name to index mapping
+ */
+ for (i = 0; i < NCs.length; i++) {
+ ncNameToIndex.put(NCs[i], i);
+ }
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+}