You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:31:58 UTC
[04/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
index 09e193a..dc9fb50 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
@@ -33,14 +33,13 @@ import org.apache.asterix.builders.OrderedListBuilder;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.feed.api.IFeed;
+import org.apache.asterix.external.feed.api.IFeed.FeedType;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.Feed.FeedType;
-import org.apache.asterix.metadata.entities.PrimaryFeed;
-import org.apache.asterix.metadata.entities.SecondaryFeed;
import org.apache.asterix.om.base.AMutableString;
import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.base.ARecord;
@@ -82,7 +81,7 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
int recordLength = frameTuple.getFieldLength(FEED_PAYLOAD_TUPLE_FIELD_INDEX);
ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
DataInput in = new DataInputStream(stream);
- ARecord feedRecord = (ARecord) recordSerDes.deserialize(in);
+ ARecord feedRecord = recordSerDes.deserialize(in);
return createFeedFromARecord(feedRecord);
}
@@ -103,18 +102,18 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
String feedType = ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FEED_TYPE_FIELD_INDEX))
.getStringValue();
- FeedType feedTypeEnum = FeedType.valueOf(feedType.toUpperCase());
+ IFeed.FeedType feedTypeEnum = IFeed.FeedType.valueOf(feedType.toUpperCase());
switch (feedTypeEnum) {
case PRIMARY: {
ARecord feedTypeDetailsRecord = (ARecord) feedRecord
.getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX);
String adapterName = ((AString) feedTypeDetailsRecord
.getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX))
- .getStringValue();
+ .getStringValue();
- IACursor cursor = ((AUnorderedList) feedTypeDetailsRecord
- .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX))
- .getCursor();
+ IACursor cursor = ((AUnorderedList) feedTypeDetailsRecord.getValueByPos(
+ MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX))
+ .getCursor();
String key;
String value;
Map<String, String> adaptorConfiguration = new HashMap<String, String>();
@@ -126,7 +125,8 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
.getStringValue();
adaptorConfiguration.put(key, value);
}
- feed = new PrimaryFeed(dataverseName, feedName, adapterName, adaptorConfiguration, signature);
+ feed = new Feed(dataverseName, feedName, signature, FeedType.PRIMARY, feedName, adapterName,
+ adaptorConfiguration);
}
break;
@@ -136,9 +136,9 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
String sourceFeedName = ((AString) feedTypeDetailsRecord
.getValueByPos(MetadataRecordTypes.FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX))
- .getStringValue();
+ .getStringValue();
- feed = new SecondaryFeed(dataverseName, feedName, sourceFeedName, signature);
+ feed = new Feed(dataverseName, feedName, signature, FeedType.SECONDARY, sourceFeedName, null, null);
}
break;
@@ -215,7 +215,6 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
switch (feed.getFeedType()) {
case PRIMARY: {
- PrimaryFeed primaryFeed = (PrimaryFeed) feed;
IARecordBuilder primaryDetailsRecordBuilder = new RecordBuilder();
OrderedListBuilder listBuilder = new OrderedListBuilder();
@@ -229,16 +228,16 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
// write field 0
fieldValue.reset();
- aString.setValue(primaryFeed.getAdaptorName());
+ aString.setValue(feed.getAdapterName());
stringSerde.serialize(aString, primaryRecordfieldValue.getDataOutput());
primaryDetailsRecordBuilder.addField(
MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX,
primaryRecordfieldValue);
// write field 1
- listBuilder
- .reset((AUnorderedListType) MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX]);
- for (Map.Entry<String, String> property : primaryFeed.getAdaptorConfiguration().entrySet()) {
+ listBuilder.reset((AUnorderedListType) MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE
+ .getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX]);
+ for (Map.Entry<String, String> property : feed.getAdapterConfiguration().entrySet()) {
String name = property.getKey();
String value = property.getValue();
primaryRecordItemValue.reset();
@@ -262,15 +261,13 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
break;
case SECONDARY:
- SecondaryFeed secondaryFeed = (SecondaryFeed) feed;
-
IARecordBuilder secondaryDetailsRecordBuilder = new RecordBuilder();
ArrayBackedValueStorage secondaryFieldValue = new ArrayBackedValueStorage();
secondaryDetailsRecordBuilder.reset(MetadataRecordTypes.SECONDARY_FEED_DETAILS_RECORDTYPE);
// write field 0
fieldValue.reset();
- aString.setValue(secondaryFeed.getSourceFeedName());
+ aString.setValue(feed.getSourceFeedName());
stringSerde.serialize(aString, secondaryFieldValue.getDataOutput());
secondaryDetailsRecordBuilder.addField(
MetadataRecordTypes.FEED_ARECORD_SECONDARY_FIELD_DETAILS_SOURCE_FEED_NAME_FIELD_INDEX,
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractDatasourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractDatasourceAdapter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractDatasourceAdapter.java
deleted file mode 100644
index d65468e..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractDatasourceAdapter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * Represents the base class that is required to be extended by every
- * implementation of the IDatasourceAdapter interface.
- */
-public abstract class AbstractDatasourceAdapter implements IDataSourceAdapter {
-
- private static final long serialVersionUID = 1L;
-
- public static final String KEY_PARSER_FACTORY = "parser";
-
- protected Map<String, Object> configuration;
- protected transient AlgebricksPartitionConstraint partitionConstraint;
- protected IAType atype;
- protected IHyracksTaskContext ctx;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
deleted file mode 100644
index c231ad9..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-
-
-public abstract class AbstractFeedDatasourceAdapter implements IDataSourceAdapter {
-
- private static final long serialVersionUID = 1L;
-
- protected FeedPolicyEnforcer policyEnforcer;
-
- public FeedPolicyEnforcer getPolicyEnforcer() {
- return policyEnforcer;
- }
-
- public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
- this.policyEnforcer = policyEnforcer;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java
deleted file mode 100644
index 6c2f14c..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager.State;
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-
-public class AdapterExecutor implements Runnable {
-
- private static final Logger LOGGER = Logger.getLogger(AdapterExecutor.class.getName());
-
- private final DistributeFeedFrameWriter writer;
-
- private final IDataSourceAdapter adapter;
-
- private final IAdapterRuntimeManager adapterManager;
-
- public AdapterExecutor(int partition, DistributeFeedFrameWriter writer, IDataSourceAdapter adapter,
- IAdapterRuntimeManager adapterManager) {
- this.writer = writer;
- this.adapter = adapter;
- this.adapterManager = adapterManager;
- }
-
- @Override
- public void run() {
- int partition = adapterManager.getPartition();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting ingestion for partition:" + partition);
- }
- boolean continueIngestion = true;
- boolean failedIngestion = false;
- while (continueIngestion) {
- try {
- adapter.start(partition, writer);
- continueIngestion = false;
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Exception during feed ingestion " + e.getMessage());
- e.printStackTrace();
- }
- continueIngestion = adapter.handleException(e);
- failedIngestion = !continueIngestion;
- }
- }
-
- adapterManager.setState(failedIngestion ? State.FAILED_INGESTION : State.FINISHED_INGESTION);
- synchronized (adapterManager) {
- adapterManager.notifyAll();
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterIdentifier.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterIdentifier.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterIdentifier.java
deleted file mode 100644
index f7e528b..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterIdentifier.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.io.Serializable;
-
-/**
- * A unique identifier for a data source adapter.
- */
-public class AdapterIdentifier implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final String namespace;
- private final String name;
-
- public AdapterIdentifier(String namespace, String name) {
- this.namespace = namespace;
- this.name = name;
- }
-
- public String getNamespace() {
- return namespace;
- }
-
- public String getName() {
- return name;
- }
-
- @Override
- public int hashCode() {
- return (namespace + "@" + name).hashCode();
-
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == null) {
- return false;
- }
- if (this == o) {
- return true;
- }
- if (!(o instanceof AdapterIdentifier)) {
- return false;
- }
- return namespace.equals(((AdapterIdentifier) o).getNamespace())
- && name.equals(((AdapterIdentifier) o).getName());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java
deleted file mode 100644
index aacb3da..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.IngestionRuntime;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-
-public class AdapterRuntimeManager implements IAdapterRuntimeManager {
-
- private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
-
- private final FeedId feedId;
-
- private final IDataSourceAdapter feedAdapter;
-
- private final IIntakeProgressTracker tracker;
-
- private final AdapterExecutor adapterExecutor;
-
- private final int partition;
-
- private final ExecutorService executorService;
-
- private IngestionRuntime ingestionRuntime;
-
- private State state;
-
- public AdapterRuntimeManager(FeedId feedId, IDataSourceAdapter feedAdapter, IIntakeProgressTracker tracker,
- DistributeFeedFrameWriter writer, int partition) {
- this.feedId = feedId;
- this.feedAdapter = feedAdapter;
- this.tracker = tracker;
- this.partition = partition;
- this.adapterExecutor = new AdapterExecutor(partition, writer, feedAdapter, this);
- this.executorService = Executors.newSingleThreadExecutor();
- this.state = State.INACTIVE_INGESTION;
- }
-
- @Override
- public void start() throws Exception {
- state = State.ACTIVE_INGESTION;
- executorService.execute(adapterExecutor);
- }
-
- @Override
- public void stop() {
- try {
- feedAdapter.stop();
- } catch (Exception exception) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Unable to stop adapter " + feedAdapter + ", encountered exception " + exception);
- }
- } finally {
- state = State.FINISHED_INGESTION;
- executorService.shutdown();
- }
- }
-
- @Override
- public FeedId getFeedId() {
- return feedId;
- }
-
- @Override
- public String toString() {
- return feedId + "[" + partition + "]";
- }
-
- @Override
- public IDataSourceAdapter getFeedAdapter() {
- return feedAdapter;
- }
-
- public IIntakeProgressTracker getTracker() {
- return tracker;
- }
-
- @Override
- public synchronized State getState() {
- return state;
- }
-
- @Override
- public synchronized void setState(State state) {
- this.state = state;
- }
-
- public AdapterExecutor getAdapterExecutor() {
- return adapterExecutor;
- }
-
- @Override
- public int getPartition() {
- return partition;
- }
-
- public IngestionRuntime getIngestionRuntime() {
- return ingestionRuntime;
- }
-
- @Override
- public IIntakeProgressTracker getProgressTracker() {
- return tracker;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
index a144d5f..8ef6732 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -21,37 +21,37 @@ package org.apache.asterix.metadata.feeds;
import java.util.HashMap;
import java.util.Map;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
-import org.apache.asterix.metadata.entities.FeedPolicy;
+import org.apache.asterix.common.config.MetadataConstants;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
public class BuiltinFeedPolicies {
- public static final FeedPolicy BRITTLE = initializeBrittlePolicy();
+ public static final FeedPolicyEntity BRITTLE = initializeBrittlePolicy();
- public static final FeedPolicy BASIC = initializeBasicPolicy();
+ public static final FeedPolicyEntity BASIC = initializeBasicPolicy();
- public static final FeedPolicy BASIC_FT = initializeBasicFTPolicy();
+ public static final FeedPolicyEntity BASIC_FT = initializeBasicFTPolicy();
- public static final FeedPolicy ADVANCED_FT = initializeAdvancedFTPolicy();
+ public static final FeedPolicyEntity ADVANCED_FT = initializeAdvancedFTPolicy();
- public static final FeedPolicy ADVANCED_FT_DISCARD = initializeAdvancedFTDiscardPolicy();
+ public static final FeedPolicyEntity ADVANCED_FT_DISCARD = initializeAdvancedFTDiscardPolicy();
- public static final FeedPolicy ADVANCED_FT_SPILL = initializeAdvancedFTSpillPolicy();
+ public static final FeedPolicyEntity ADVANCED_FT_SPILL = initializeAdvancedFTSpillPolicy();
- public static final FeedPolicy ADVANCED_FT_THROTTLE = initializeAdvancedFTThrottlePolicy();
+ public static final FeedPolicyEntity ADVANCED_FT_THROTTLE = initializeAdvancedFTThrottlePolicy();
- public static final FeedPolicy ELASTIC = initializeAdvancedFTElasticPolicy();
+ public static final FeedPolicyEntity ELASTIC = initializeAdvancedFTElasticPolicy();
- public static final FeedPolicy[] policies = new FeedPolicy[] { BRITTLE, BASIC, BASIC_FT, ADVANCED_FT,
+ public static final FeedPolicyEntity[] policies = new FeedPolicyEntity[] { BRITTLE, BASIC, BASIC_FT, ADVANCED_FT,
ADVANCED_FT_DISCARD, ADVANCED_FT_SPILL, ADVANCED_FT_THROTTLE, ELASTIC };
- public static final FeedPolicy DEFAULT_POLICY = BASIC_FT;
+ public static final FeedPolicyEntity DEFAULT_POLICY = BASIC_FT;
public static final String CONFIG_FEED_POLICY_KEY = "policy";
- public static FeedPolicy getFeedPolicy(String policyName) {
- for (FeedPolicy policy : policies) {
+ public static FeedPolicyEntity getFeedPolicy(String policyName) {
+ for (FeedPolicyEntity policy : policies) {
if (policy.getPolicyName().equalsIgnoreCase(policyName)) {
return policy;
}
@@ -60,7 +60,7 @@ public class BuiltinFeedPolicies {
}
//Brittle
- private static FeedPolicy initializeBrittlePolicy() {
+ private static FeedPolicyEntity initializeBrittlePolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "false");
@@ -71,11 +71,11 @@ public class BuiltinFeedPolicies {
policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
String description = "Brittle";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Brittle", description, policyParams);
+ return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "Brittle", description, policyParams);
}
//Basic
- private static FeedPolicy initializeBasicPolicy() {
+ private static FeedPolicyEntity initializeBasicPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -85,11 +85,11 @@ public class BuiltinFeedPolicies {
policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
String description = "Basic";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Basic", description, policyParams);
+ return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "Basic", description, policyParams);
}
// BasicFT
- private static FeedPolicy initializeBasicFTPolicy() {
+ private static FeedPolicyEntity initializeBasicFTPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -103,11 +103,11 @@ public class BuiltinFeedPolicies {
policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "false");
String description = "Basic Monitored Fault-Tolerant";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BasicFT", description, policyParams);
+ return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "BasicFT", description, policyParams);
}
// AdvancedFT
- private static FeedPolicy initializeAdvancedFTPolicy() {
+ private static FeedPolicyEntity initializeAdvancedFTPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -118,11 +118,11 @@ public class BuiltinFeedPolicies {
policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "true");
String description = "Basic Monitored Fault-Tolerant with at least once semantics";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT", description, policyParams);
+ return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT", description, policyParams);
}
// AdvancedFT_Discard
- private static FeedPolicy initializeAdvancedFTDiscardPolicy() {
+ private static FeedPolicyEntity initializeAdvancedFTDiscardPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -133,14 +133,14 @@ public class BuiltinFeedPolicies {
policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "100");
policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
-
+
String description = "AdvancedFT 100% Discard during congestion";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Discard", description,
+ return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Discard", description,
policyParams);
}
// AdvancedFT_Spill
- private static FeedPolicy initializeAdvancedFTSpillPolicy() {
+ private static FeedPolicyEntity initializeAdvancedFTSpillPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -152,11 +152,11 @@ public class BuiltinFeedPolicies {
policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "true");
String description = "AdvancedFT 100% Discard during congestion";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Spill", description, policyParams);
+ return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Spill", description, policyParams);
}
// AdvancedFT_Spill
- private static FeedPolicy initializeAdvancedFTThrottlePolicy() {
+ private static FeedPolicyEntity initializeAdvancedFTThrottlePolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -169,12 +169,12 @@ public class BuiltinFeedPolicies {
policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "true");
String description = "AdvancedFT Throttle during congestion";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Throttle", description,
+ return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Throttle", description,
policyParams);
}
// AdvancedFT_Elastic
- private static FeedPolicy initializeAdvancedFTElasticPolicy() {
+ private static FeedPolicyEntity initializeAdvancedFTElasticPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -185,7 +185,7 @@ public class BuiltinFeedPolicies {
policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
String description = "Basic Monitored Fault-Tolerant Elastic";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Elastic", description,
+ return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Elastic", description,
policyParams);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
deleted file mode 100644
index 30b369a..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class CollectTransformFeedFrameWriter implements IFeedOperatorOutputSideHandler {
-
- private final FeedConnectionId connectionId;
- private IFrameWriter downstreamWriter;
- private final FrameTupleAccessor inputFrameTupleAccessor;
- private final FrameTupleAppender tupleAppender;
- private final IFrame frame;
-
- private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
-
- public CollectTransformFeedFrameWriter(IHyracksTaskContext ctx, IFrameWriter downstreamWriter,
- ISubscribableRuntime sourceRuntime, RecordDescriptor outputRecordDescriptor, FeedConnectionId connectionId)
- throws HyracksDataException {
- this.downstreamWriter = downstreamWriter;
- RecordDescriptor inputRecordDescriptor = sourceRuntime.getRecordDescriptor();
- inputFrameTupleAccessor = new FrameTupleAccessor(inputRecordDescriptor);
- tupleAppender = new FrameTupleAppender();
- frame = new VSizeFrame(ctx);
- tupleAppender.reset(frame, true);
- this.connectionId = connectionId;
- }
-
- @Override
- public void open() throws HyracksDataException {
- downstreamWriter.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- inputFrameTupleAccessor.reset(buffer);
- int nTuple = inputFrameTupleAccessor.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tupleBuilder.addField(inputFrameTupleAccessor, t, 0);
- appendTupleToFrame();
- tupleBuilder.reset();
- }
- }
-
- private void appendTupleToFrame() throws HyracksDataException {
- if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(frame.getBuffer(), downstreamWriter);
- tupleAppender.reset(frame, true);
- if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- downstreamWriter.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- downstreamWriter.close();
- }
-
- @Override
- public FeedId getFeedId() {
- return connectionId.getFeedId();
- }
-
- @Override
- public Type getType() {
- return Type.COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER;
- }
-
- public IFrameWriter getDownstreamWriter() {
- return downstreamWriter;
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public void reset(IFrameWriter writer) {
- this.downstreamWriter = writer;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
deleted file mode 100644
index fee99d8..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/*
- * A single activity operator that provides the functionality of scanning data using an
- * instance of the configured adapter.
- */
-public class ExternalDataScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- private IAdapterFactory adapterFactory;
-
- public ExternalDataScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
- IAdapterFactory dataSourceAdapterFactory) {
- super(spec, 0, 1);
- recordDescriptors[0] = rDesc;
- this.adapterFactory = dataSourceAdapterFactory;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
-
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
-
- @Override
- public void initialize() throws HyracksDataException {
- IDataSourceAdapter adapter = null;
- try {
- writer.open();
- adapter = adapterFactory.createAdapter(ctx, partition);
- adapter.start(partition, writer);
- } catch (Throwable th) {
- writer.fail();
- throw new HyracksDataException(th);
- } finally {
- writer.close();
- }
- }
- };
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedActivityIdFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedActivityIdFactory.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedActivityIdFactory.java
deleted file mode 100644
index a0a4af9..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedActivityIdFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class FeedActivityIdFactory {
- private static AtomicInteger id = new AtomicInteger();
- private static boolean isInitialized = false;
-
- public static boolean isInitialized() {
- return isInitialized;
- }
-
- public static void initialize(int initialId) {
- id.set(initialId);
- isInitialized = true;
- }
-
- public static int generateFeedActivityId() {
- return id.incrementAndGet();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
deleted file mode 100644
index 715b68b..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.IngestionRuntime;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * FeedCollectOperatorDescriptor is responsible for ingesting data from an external source. This
- * operator uses a user specified for a built-in adaptor for retrieving data from the external
- * data source.
- */
-public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(FeedCollectOperatorDescriptor.class.getName());
-
- /** The type associated with the ADM data output from the feed adaptor */
- private final IAType outputType;
-
- /** unique identifier for a feed instance. */
- private final FeedConnectionId connectionId;
-
- /** Map representation of policy parameters */
- private final Map<String, String> feedPolicyProperties;
-
- /** The (singleton) instance of {@code IFeedIngestionManager} **/
- private IFeedSubscriptionManager subscriptionManager;
-
- /** The source feed from which the feed derives its data from. **/
- private final FeedId sourceFeedId;
-
- /** The subscription location at which the recipient feed receives tuples from the source feed **/
- private final ConnectionLocation subscriptionLocation;
-
- public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, FeedId sourceFeedId,
- ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
- ConnectionLocation subscriptionLocation) {
- super(spec, 0, 1);
- recordDescriptors[0] = rDesc;
- this.outputType = atype;
- this.connectionId = feedConnectionId;
- this.feedPolicyProperties = feedPolicyProperties;
- this.sourceFeedId = sourceFeedId;
- this.subscriptionLocation = subscriptionLocation;
- }
-
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
- throws HyracksDataException {
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- this.subscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
- ISubscribableRuntime sourceRuntime = null;
- IOperatorNodePushable nodePushable = null;
- switch (subscriptionLocation) {
- case SOURCE_FEED_INTAKE_STAGE:
- try {
- SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
- FeedRuntimeType.INTAKE, partition);
- sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
- if (sourceRuntime == null) {
- throw new HyracksDataException("Source intake task not found for source feed id "
- + sourceFeedId);
- }
- nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
- feedPolicyProperties, partition, nPartitions, sourceRuntime);
-
- } catch (Exception exception) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Initialization of the feed adaptor failed with exception " + exception);
- }
- throw new HyracksDataException("Initialization of the feed adapter failed", exception);
- }
- break;
- case SOURCE_FEED_COMPUTE_STAGE:
- SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
- FeedRuntimeType.COMPUTE, partition);
- sourceRuntime = (ISubscribableRuntime) subscriptionManager
- .getSubscribableRuntime(feedSubscribableRuntimeId);
- if (sourceRuntime == null) {
- throw new HyracksDataException("Source compute task not found for source feed id " + sourceFeedId
- + " " + FeedRuntimeType.COMPUTE + "[" + partition + "]");
- }
- nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
- feedPolicyProperties, partition, nPartitions, sourceRuntime);
- break;
- }
- return nodePushable;
- }
-
- public FeedConnectionId getFeedConnectionId() {
- return connectionId;
- }
-
- public Map<String, String> getFeedPolicyProperties() {
- return feedPolicyProperties;
- }
-
- public IAType getOutputType() {
- return outputType;
- }
-
- public RecordDescriptor getRecordDescriptor() {
- return recordDescriptors[0];
- }
-
- public FeedId getSourceFeedId() {
- return sourceFeedId;
- }
-
- private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
- int waitCycleCount = 0;
- ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
- while (ingestionRuntime == null && waitCycleCount < 10) {
- try {
- Thread.sleep(2000);
- waitCycleCount++;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- break;
- }
- ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
- }
- return (IngestionRuntime) ingestionRuntime;
- }
-
- public ConnectionLocation getSubscriptionLocation() {
- return subscriptionLocation;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
deleted file mode 100644
index 8f9c8f3..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.CollectionRuntime;
-import org.apache.asterix.common.feeds.FeedCollectRuntimeInputHandler;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedFrameCollector.State;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/**
- * The runtime for @see{FeedIntakeOperationDescriptor}
- */
-public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-
- private static Logger LOGGER = Logger.getLogger(FeedCollectOperatorNodePushable.class.getName());
-
- private final int partition;
- private final FeedConnectionId connectionId;
- private final Map<String, String> feedPolicy;
- private final FeedPolicyAccessor policyAccessor;
- private final IFeedManager feedManager;
- private final ISubscribableRuntime sourceRuntime;
- private final IHyracksTaskContext ctx;
- private final int nPartitions;
-
- private RecordDescriptor outputRecordDescriptor;
- private FeedRuntimeInputHandler inputSideHandler;
- private CollectionRuntime collectRuntime;
-
- public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedId sourceFeedId,
- FeedConnectionId feedConnectionId, Map<String, String> feedPolicy, int partition, int nPartitions,
- ISubscribableRuntime sourceRuntime) {
- this.ctx = ctx;
- this.partition = partition;
- this.nPartitions = nPartitions;
- this.connectionId = feedConnectionId;
- this.sourceRuntime = sourceRuntime;
- this.feedPolicy = feedPolicy;
- policyAccessor = new FeedPolicyAccessor(feedPolicy);
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- this.feedManager = runtimeCtx.getFeedManager();
- }
-
- @Override
- public void initialize() throws HyracksDataException {
- try {
- outputRecordDescriptor = recordDesc;
- FeedRuntimeType sourceRuntimeType = ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId())
- .getFeedRuntimeType();
- switch (sourceRuntimeType) {
- case INTAKE:
- handleCompleteConnection();
- break;
- case COMPUTE:
- handlePartialConnection();
- break;
- default:
- throw new IllegalStateException("Invalid source type " + sourceRuntimeType);
- }
-
- State state = collectRuntime.waitTillCollectionOver();
- if (state.equals(State.FINISHED)) {
- feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
- collectRuntime.getRuntimeId());
- writer.close();
- inputSideHandler.close();
- } else if (state.equals(State.HANDOVER)) {
- inputSideHandler.setMode(Mode.STALL);
- writer.close();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ending Collect Operator, the input side handler is now in " + Mode.STALL
- + " and the output writer " + writer + " has been closed ");
- }
- }
- } catch (InterruptedException ie) {
- handleInterruptedException(ie);
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- }
-
- private void handleCompleteConnection() throws Exception {
- FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COLLECT, partition,
- FeedRuntimeId.DEFAULT_OPERAND_ID);
- collectRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(connectionId,
- runtimeId);
- if (collectRuntime == null) {
- beginNewFeed(runtimeId);
- } else {
- reviveOldFeed();
- }
- }
-
- private void beginNewFeed(FeedRuntimeId runtimeId) throws Exception {
- writer.open();
- IFrameWriter outputSideWriter = writer;
- if (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType().equals(
- FeedRuntimeType.COMPUTE)) {
- outputSideWriter = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime, outputRecordDescriptor,
- connectionId);
- this.recordDesc = sourceRuntime.getRecordDescriptor();
- }
-
- FrameTupleAccessor tupleAccessor = new FrameTupleAccessor(recordDesc);
- inputSideHandler = new FeedCollectRuntimeInputHandler(ctx, connectionId, runtimeId, outputSideWriter, policyAccessor,
- false, tupleAccessor, recordDesc,
- feedManager, nPartitions);
-
- collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, outputSideWriter,
- sourceRuntime, feedPolicy);
- feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
- sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
- }
-
- private void reviveOldFeed() throws HyracksDataException {
- writer.open();
- collectRuntime.getFrameCollector().setState(State.ACTIVE);
- inputSideHandler = collectRuntime.getInputHandler();
-
- IFrameWriter innerWriter = inputSideHandler.getCoreOperator();
- if (innerWriter instanceof CollectTransformFeedFrameWriter) {
- ((CollectTransformFeedFrameWriter) innerWriter).reset(this.writer);
- } else {
- inputSideHandler.setCoreOperator(writer);
- }
-
- inputSideHandler.setMode(Mode.PROCESS);
- }
-
- private void handlePartialConnection() throws Exception {
- FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COMPUTE_COLLECT, partition,
- FeedRuntimeId.DEFAULT_OPERAND_ID);
- writer.open();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Beginning new feed (from existing partial connection):" + connectionId);
- }
- IFeedOperatorOutputSideHandler wrapper = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime,
- outputRecordDescriptor, connectionId);
-
- inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, wrapper, policyAccessor, false,
- new FrameTupleAccessor(recordDesc), recordDesc, feedManager,
- nPartitions);
-
- collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, wrapper, sourceRuntime,
- feedPolicy);
- feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
- recordDesc = sourceRuntime.getRecordDescriptor();
- sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
- }
-
- private void handleInterruptedException(InterruptedException ie) throws HyracksDataException {
- if (policyAccessor.continueOnHardwareFailure()) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Continuing on failure as per feed policy, switching to " + Mode.STALL
- + " until failure is resolved");
- }
- inputSideHandler.setMode(Mode.STALL);
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Failure during feed ingestion. Deregistering feed runtime " + collectRuntime
- + " as feed is not configured to handle failures");
- }
- feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
- writer.close();
- throw new HyracksDataException(ie);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedConnectionManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedConnectionManager.java
deleted file mode 100644
index 7356c94..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedConnectionManager.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeManager;
-import org.apache.asterix.common.feeds.api.IFeedConnectionManager;
-
-/**
- * An implementation of the IFeedManager interface.
- * Provider necessary central repository for registering/retrieving
- * artifacts/services associated with a feed.
- */
-public class FeedConnectionManager implements IFeedConnectionManager {
-
- private static final Logger LOGGER = Logger.getLogger(FeedConnectionManager.class.getName());
-
- private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
- private final String nodeId;
-
- public FeedConnectionManager(String nodeId) {
- this.nodeId = nodeId;
- }
-
- public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
- return feedRuntimeManagers.get(feedId);
- }
-
- @Override
- public void deregisterFeed(FeedConnectionId feedId) {
- try {
- FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
- if (mgr != null) {
- mgr.close();
- feedRuntimeManagers.remove(feedId);
- }
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
- }
- }
-
- }
-
- @Override
- public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime)
- throws Exception {
- FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
- if (runtimeMgr == null) {
- runtimeMgr = new FeedRuntimeManager(connectionId, this);
- feedRuntimeManagers.put(connectionId, runtimeMgr);
- }
- runtimeMgr.registerFeedRuntime(feedRuntime.getRuntimeId(), feedRuntime);
- }
-
- @Override
- public void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
- FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
- if (runtimeMgr != null) {
- runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
- }
- }
-
- @Override
- public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
- FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
- return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
- }
-
- @Override
- public String toString() {
- return "FeedManager " + "[" + nodeId + "]";
- }
-
- @Override
- public List<FeedRuntimeId> getRegisteredRuntimes() {
- List<FeedRuntimeId> runtimes = new ArrayList<FeedRuntimeId>();
- for (Entry<FeedConnectionId, FeedRuntimeManager> entry : feedRuntimeManagers.entrySet()) {
- runtimes.addAll(entry.getValue().getFeedRuntimes());
- }
- return runtimes;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedFrameTupleDecorator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedFrameTupleDecorator.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedFrameTupleDecorator.java
deleted file mode 100644
index 6ee14d8..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedFrameTupleDecorator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedConstants.StatisticsConstants;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-
-public class FeedFrameTupleDecorator {
-
- private AMutableString aString = new AMutableString("");
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AtomicInteger tupleId;
-
- @SuppressWarnings("unchecked")
- private static ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ASTRING);
- @SuppressWarnings("unchecked")
- private static ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- @SuppressWarnings("unchecked")
- private static ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
-
- private final int partition;
- private final ArrayBackedValueStorage attrNameStorage;
- private final ArrayBackedValueStorage attrValueStorage;
-
- public FeedFrameTupleDecorator(int partition) {
- this.tupleId = new AtomicInteger(0);
- this.partition = partition;
- this.attrNameStorage = new ArrayBackedValueStorage();
- this.attrValueStorage = new ArrayBackedValueStorage();
- }
-
- public void addLongAttribute(String attrName, long attrValue, IARecordBuilder recordBuilder)
- throws HyracksDataException, AsterixException {
- attrNameStorage.reset();
- aString.setValue(attrName);
- stringSerde.serialize(aString, attrNameStorage.getDataOutput());
-
- attrValueStorage.reset();
- aInt64.setValue(attrValue);
- int64Serde.serialize(aInt64, attrValueStorage.getDataOutput());
-
- recordBuilder.addField(attrNameStorage, attrValueStorage);
- }
-
- public void addIntegerAttribute(String attrName, int attrValue, IARecordBuilder recordBuilder)
- throws HyracksDataException, AsterixException {
- attrNameStorage.reset();
- aString.setValue(attrName);
- stringSerde.serialize(aString, attrNameStorage.getDataOutput());
-
- attrValueStorage.reset();
- aInt32.setValue(attrValue);
- int32Serde.serialize(aInt32, attrValueStorage.getDataOutput());
-
- recordBuilder.addField(attrNameStorage, attrValueStorage);
- }
-
- public void addTupleId(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
- addIntegerAttribute(StatisticsConstants.INTAKE_TUPLEID, tupleId.incrementAndGet(), recordBuilder);
- }
-
- public void addIntakePartition(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
- addIntegerAttribute(StatisticsConstants.INTAKE_PARTITION, partition, recordBuilder);
- }
-
- public void addIntakeTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
- addLongAttribute(StatisticsConstants.INTAKE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
- }
-
- public void addStoreTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
- addLongAttribute(StatisticsConstants.STORE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
deleted file mode 100644
index 54c9af5..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.feeds.IngestionRuntime;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.library.ExternalLibraryManager;
-import org.apache.asterix.metadata.entities.PrimaryFeed;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * An operator responsible for establishing connection with external data source and parsing,
- * translating the received content.It uses an instance of feed adaptor to perform these functions.
- */
-public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
-
- /** The unique identifier of the feed that is being ingested. **/
- private final FeedId feedId;
-
- private final FeedPolicyAccessor policyAccessor;
-
- /** The adaptor factory that is used to create an instance of the feed adaptor **/
- private IAdapterFactory adaptorFactory;
-
- /** The library that contains the adapter in use. **/
- private String adaptorLibraryName;
-
- /**
- * The adapter factory class that is used to create an instance of the feed adapter.
- * This value is used only in the case of external adapters.
- **/
- private String adaptorFactoryClassName;
-
- /** The configuration parameters associated with the adapter. **/
- private Map<String, String> adaptorConfiguration;
-
- private ARecordType adapterOutputType;
-
- public FeedIntakeOperatorDescriptor(JobSpecification spec, PrimaryFeed primaryFeed, IAdapterFactory adapterFactory,
- ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
- super(spec, 0, 1);
- this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
- this.adaptorFactory = adapterFactory;
- this.adapterOutputType = adapterOutputType;
- this.policyAccessor = policyAccessor;
- }
-
- public FeedIntakeOperatorDescriptor(JobSpecification spec, PrimaryFeed primaryFeed, String adapterLibraryName,
- String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
- super(spec, 0, 1);
- this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
- this.adaptorFactoryClassName = adapterFactoryClassName;
- this.adaptorLibraryName = adapterLibraryName;
- this.adaptorConfiguration = primaryFeed.getAdaptorConfiguration();
- this.adapterOutputType = adapterOutputType;
- this.policyAccessor = policyAccessor;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- IFeedSubscriptionManager feedSubscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
- SubscribableFeedRuntimeId feedIngestionId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
- partition);
- IngestionRuntime ingestionRuntime = (IngestionRuntime) feedSubscriptionManager
- .getSubscribableRuntime(feedIngestionId);
- if (adaptorFactory == null) {
- try {
- adaptorFactory = createExtenralAdapterFactory(ctx, partition);
- } catch (Exception exception) {
- throw new HyracksDataException(exception);
- }
-
- }
- return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, ingestionRuntime,
- policyAccessor);
- }
-
- private IAdapterFactory createExtenralAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception {
- IAdapterFactory adapterFactory = null;
- ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
- adaptorLibraryName);
- if (classLoader != null) {
- adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance()));
- adapterFactory.configure(adaptorConfiguration, adapterOutputType);
- } else {
- String message = "Unable to create adapter as class loader not configured for library " + adaptorLibraryName
- + " in dataverse " + feedId.getDataverse();
- LOGGER.severe(message);
- throw new IllegalArgumentException(message);
- }
- return adapterFactory;
- }
-
- public FeedId getFeedId() {
- return feedId;
- }
-
-}