You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:31:58 UTC

[04/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
index 09e193a..dc9fb50 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
@@ -33,14 +33,13 @@ import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.feed.api.IFeed;
+import org.apache.asterix.external.feed.api.IFeed.FeedType;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
 import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.Feed.FeedType;
-import org.apache.asterix.metadata.entities.PrimaryFeed;
-import org.apache.asterix.metadata.entities.SecondaryFeed;
 import org.apache.asterix.om.base.AMutableString;
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.base.ARecord;
@@ -82,7 +81,7 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
         int recordLength = frameTuple.getFieldLength(FEED_PAYLOAD_TUPLE_FIELD_INDEX);
         ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
         DataInput in = new DataInputStream(stream);
-        ARecord feedRecord = (ARecord) recordSerDes.deserialize(in);
+        ARecord feedRecord = recordSerDes.deserialize(in);
         return createFeedFromARecord(feedRecord);
     }
 
@@ -103,18 +102,18 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
         String feedType = ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FEED_TYPE_FIELD_INDEX))
                 .getStringValue();
 
-        FeedType feedTypeEnum = FeedType.valueOf(feedType.toUpperCase());
+        IFeed.FeedType feedTypeEnum = IFeed.FeedType.valueOf(feedType.toUpperCase());
         switch (feedTypeEnum) {
             case PRIMARY: {
                 ARecord feedTypeDetailsRecord = (ARecord) feedRecord
                         .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX);
                 String adapterName = ((AString) feedTypeDetailsRecord
                         .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX))
-                        .getStringValue();
+                                .getStringValue();
 
-                IACursor cursor = ((AUnorderedList) feedTypeDetailsRecord
-                        .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX))
-                        .getCursor();
+                IACursor cursor = ((AUnorderedList) feedTypeDetailsRecord.getValueByPos(
+                        MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX))
+                                .getCursor();
                 String key;
                 String value;
                 Map<String, String> adaptorConfiguration = new HashMap<String, String>();
@@ -126,7 +125,8 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
                             .getStringValue();
                     adaptorConfiguration.put(key, value);
                 }
-                feed = new PrimaryFeed(dataverseName, feedName, adapterName, adaptorConfiguration, signature);
+                feed = new Feed(dataverseName, feedName, signature, FeedType.PRIMARY, feedName, adapterName,
+                        adaptorConfiguration);
 
             }
                 break;
@@ -136,9 +136,9 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
 
                 String sourceFeedName = ((AString) feedTypeDetailsRecord
                         .getValueByPos(MetadataRecordTypes.FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX))
-                        .getStringValue();
+                                .getStringValue();
 
-                feed = new SecondaryFeed(dataverseName, feedName, sourceFeedName, signature);
+                feed = new Feed(dataverseName, feedName, signature, FeedType.SECONDARY, sourceFeedName, null, null);
 
             }
                 break;
@@ -215,7 +215,6 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
 
         switch (feed.getFeedType()) {
             case PRIMARY: {
-                PrimaryFeed primaryFeed = (PrimaryFeed) feed;
 
                 IARecordBuilder primaryDetailsRecordBuilder = new RecordBuilder();
                 OrderedListBuilder listBuilder = new OrderedListBuilder();
@@ -229,16 +228,16 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
 
                 // write field 0
                 fieldValue.reset();
-                aString.setValue(primaryFeed.getAdaptorName());
+                aString.setValue(feed.getAdapterName());
                 stringSerde.serialize(aString, primaryRecordfieldValue.getDataOutput());
                 primaryDetailsRecordBuilder.addField(
                         MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX,
                         primaryRecordfieldValue);
 
                 // write field 1
-                listBuilder
-                        .reset((AUnorderedListType) MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX]);
-                for (Map.Entry<String, String> property : primaryFeed.getAdaptorConfiguration().entrySet()) {
+                listBuilder.reset((AUnorderedListType) MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE
+                        .getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX]);
+                for (Map.Entry<String, String> property : feed.getAdapterConfiguration().entrySet()) {
                     String name = property.getKey();
                     String value = property.getValue();
                     primaryRecordItemValue.reset();
@@ -262,15 +261,13 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
                 break;
 
             case SECONDARY:
-                SecondaryFeed secondaryFeed = (SecondaryFeed) feed;
-
                 IARecordBuilder secondaryDetailsRecordBuilder = new RecordBuilder();
                 ArrayBackedValueStorage secondaryFieldValue = new ArrayBackedValueStorage();
                 secondaryDetailsRecordBuilder.reset(MetadataRecordTypes.SECONDARY_FEED_DETAILS_RECORDTYPE);
 
                 // write field 0
                 fieldValue.reset();
-                aString.setValue(secondaryFeed.getSourceFeedName());
+                aString.setValue(feed.getSourceFeedName());
                 stringSerde.serialize(aString, secondaryFieldValue.getDataOutput());
                 secondaryDetailsRecordBuilder.addField(
                         MetadataRecordTypes.FEED_ARECORD_SECONDARY_FIELD_DETAILS_SOURCE_FEED_NAME_FIELD_INDEX,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractDatasourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractDatasourceAdapter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractDatasourceAdapter.java
deleted file mode 100644
index d65468e..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractDatasourceAdapter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * Represents the base class that is required to be extended by every
- * implementation of the IDatasourceAdapter interface.
- */
-public abstract class AbstractDatasourceAdapter implements IDataSourceAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    public static final String KEY_PARSER_FACTORY = "parser";
-
-    protected Map<String, Object> configuration;
-    protected transient AlgebricksPartitionConstraint partitionConstraint;
-    protected IAType atype;
-    protected IHyracksTaskContext ctx;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
deleted file mode 100644
index c231ad9..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-
-
-public abstract class AbstractFeedDatasourceAdapter implements IDataSourceAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    protected FeedPolicyEnforcer policyEnforcer;
-
-    public FeedPolicyEnforcer getPolicyEnforcer() {
-        return policyEnforcer;
-    }
-
-    public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
-        this.policyEnforcer = policyEnforcer;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java
deleted file mode 100644
index 6c2f14c..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager.State;
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-
-public class AdapterExecutor implements Runnable {
-
-    private static final Logger LOGGER = Logger.getLogger(AdapterExecutor.class.getName());
-
-    private final DistributeFeedFrameWriter writer;
-
-    private final IDataSourceAdapter adapter;
-
-    private final IAdapterRuntimeManager adapterManager;
-
-    public AdapterExecutor(int partition, DistributeFeedFrameWriter writer, IDataSourceAdapter adapter,
-            IAdapterRuntimeManager adapterManager) {
-        this.writer = writer;
-        this.adapter = adapter;
-        this.adapterManager = adapterManager;
-    }
-
-    @Override
-    public void run() {
-        int partition = adapterManager.getPartition();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Starting ingestion for partition:" + partition);
-        }
-        boolean continueIngestion = true;
-        boolean failedIngestion = false;
-        while (continueIngestion) {
-            try {
-                adapter.start(partition, writer);
-                continueIngestion = false;
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.SEVERE)) {
-                    LOGGER.severe("Exception during feed ingestion " + e.getMessage());
-                    e.printStackTrace();
-                }
-                continueIngestion = adapter.handleException(e);
-                failedIngestion = !continueIngestion;
-            }
-        }
-
-        adapterManager.setState(failedIngestion ? State.FAILED_INGESTION : State.FINISHED_INGESTION);
-        synchronized (adapterManager) {
-            adapterManager.notifyAll();
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterIdentifier.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterIdentifier.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterIdentifier.java
deleted file mode 100644
index f7e528b..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterIdentifier.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.io.Serializable;
-
-/**
- * A unique identifier for a data source adapter.
- */
-public class AdapterIdentifier implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private final String namespace;
-    private final String name;
-
-    public AdapterIdentifier(String namespace, String name) {
-        this.namespace = namespace;
-        this.name = name;
-    }
-
-    public String getNamespace() {
-        return namespace;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public int hashCode() {
-        return (namespace + "@" + name).hashCode();
-
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null) {
-            return false;
-        }
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof AdapterIdentifier)) {
-            return false;
-        }
-        return namespace.equals(((AdapterIdentifier) o).getNamespace())
-                && name.equals(((AdapterIdentifier) o).getName());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java
deleted file mode 100644
index aacb3da..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.IngestionRuntime;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-
-public class AdapterRuntimeManager implements IAdapterRuntimeManager {
-
-    private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
-
-    private final FeedId feedId;
-
-    private final IDataSourceAdapter feedAdapter;
-
-    private final IIntakeProgressTracker tracker;
-
-    private final AdapterExecutor adapterExecutor;
-
-    private final int partition;
-
-    private final ExecutorService executorService;
-
-    private IngestionRuntime ingestionRuntime;
-
-    private State state;
-
-    public AdapterRuntimeManager(FeedId feedId, IDataSourceAdapter feedAdapter, IIntakeProgressTracker tracker,
-            DistributeFeedFrameWriter writer, int partition) {
-        this.feedId = feedId;
-        this.feedAdapter = feedAdapter;
-        this.tracker = tracker;
-        this.partition = partition;
-        this.adapterExecutor = new AdapterExecutor(partition, writer, feedAdapter, this);
-        this.executorService = Executors.newSingleThreadExecutor();
-        this.state = State.INACTIVE_INGESTION;
-    }
-
-    @Override
-    public void start() throws Exception {
-        state = State.ACTIVE_INGESTION;
-        executorService.execute(adapterExecutor);
-    }
-
-    @Override
-    public void stop() {
-        try {
-            feedAdapter.stop();
-        } catch (Exception exception) {
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.severe("Unable to stop adapter " + feedAdapter + ", encountered exception " + exception);
-            }
-        } finally {
-            state = State.FINISHED_INGESTION;
-            executorService.shutdown();
-        }
-    }
-
-    @Override
-    public FeedId getFeedId() {
-        return feedId;
-    }
-
-    @Override
-    public String toString() {
-        return feedId + "[" + partition + "]";
-    }
-
-    @Override
-    public IDataSourceAdapter getFeedAdapter() {
-        return feedAdapter;
-    }
-
-    public IIntakeProgressTracker getTracker() {
-        return tracker;
-    }
-
-    @Override
-    public synchronized State getState() {
-        return state;
-    }
-
-    @Override
-    public synchronized void setState(State state) {
-        this.state = state;
-    }
-
-    public AdapterExecutor getAdapterExecutor() {
-        return adapterExecutor;
-    }
-
-    @Override
-    public int getPartition() {
-        return partition;
-    }
-
-    public IngestionRuntime getIngestionRuntime() {
-        return ingestionRuntime;
-    }
-
-    @Override
-    public IIntakeProgressTracker getProgressTracker() {
-        return tracker;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
index a144d5f..8ef6732 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -21,37 +21,37 @@ package org.apache.asterix.metadata.feeds;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
-import org.apache.asterix.metadata.entities.FeedPolicy;
+import org.apache.asterix.common.config.MetadataConstants;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 
 public class BuiltinFeedPolicies {
 
-    public static final FeedPolicy BRITTLE = initializeBrittlePolicy();
+    public static final FeedPolicyEntity BRITTLE = initializeBrittlePolicy();
 
-    public static final FeedPolicy BASIC = initializeBasicPolicy();
+    public static final FeedPolicyEntity BASIC = initializeBasicPolicy();
 
-    public static final FeedPolicy BASIC_FT = initializeBasicFTPolicy();
+    public static final FeedPolicyEntity BASIC_FT = initializeBasicFTPolicy();
 
-    public static final FeedPolicy ADVANCED_FT = initializeAdvancedFTPolicy();
+    public static final FeedPolicyEntity ADVANCED_FT = initializeAdvancedFTPolicy();
 
-    public static final FeedPolicy ADVANCED_FT_DISCARD = initializeAdvancedFTDiscardPolicy();
+    public static final FeedPolicyEntity ADVANCED_FT_DISCARD = initializeAdvancedFTDiscardPolicy();
 
-    public static final FeedPolicy ADVANCED_FT_SPILL = initializeAdvancedFTSpillPolicy();
+    public static final FeedPolicyEntity ADVANCED_FT_SPILL = initializeAdvancedFTSpillPolicy();
 
-    public static final FeedPolicy ADVANCED_FT_THROTTLE = initializeAdvancedFTThrottlePolicy();
+    public static final FeedPolicyEntity ADVANCED_FT_THROTTLE = initializeAdvancedFTThrottlePolicy();
 
-    public static final FeedPolicy ELASTIC = initializeAdvancedFTElasticPolicy();
+    public static final FeedPolicyEntity ELASTIC = initializeAdvancedFTElasticPolicy();
 
-    public static final FeedPolicy[] policies = new FeedPolicy[] { BRITTLE, BASIC, BASIC_FT, ADVANCED_FT,
+    public static final FeedPolicyEntity[] policies = new FeedPolicyEntity[] { BRITTLE, BASIC, BASIC_FT, ADVANCED_FT,
             ADVANCED_FT_DISCARD, ADVANCED_FT_SPILL, ADVANCED_FT_THROTTLE, ELASTIC };
 
-    public static final FeedPolicy DEFAULT_POLICY = BASIC_FT;
+    public static final FeedPolicyEntity DEFAULT_POLICY = BASIC_FT;
 
     public static final String CONFIG_FEED_POLICY_KEY = "policy";
 
-    public static FeedPolicy getFeedPolicy(String policyName) {
-        for (FeedPolicy policy : policies) {
+    public static FeedPolicyEntity getFeedPolicy(String policyName) {
+        for (FeedPolicyEntity policy : policies) {
             if (policy.getPolicyName().equalsIgnoreCase(policyName)) {
                 return policy;
             }
@@ -60,7 +60,7 @@ public class BuiltinFeedPolicies {
     }
 
     //Brittle
-    private static FeedPolicy initializeBrittlePolicy() {
+    private static FeedPolicyEntity initializeBrittlePolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "false");
@@ -71,11 +71,11 @@ public class BuiltinFeedPolicies {
         policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
 
         String description = "Brittle";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Brittle", description, policyParams);
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "Brittle", description, policyParams);
     }
 
     //Basic
-    private static FeedPolicy initializeBasicPolicy() {
+    private static FeedPolicyEntity initializeBasicPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -85,11 +85,11 @@ public class BuiltinFeedPolicies {
         policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
 
         String description = "Basic";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Basic", description, policyParams);
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "Basic", description, policyParams);
     }
 
     // BasicFT
-    private static FeedPolicy initializeBasicFTPolicy() {
+    private static FeedPolicyEntity initializeBasicFTPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -103,11 +103,11 @@ public class BuiltinFeedPolicies {
         policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "false");
 
         String description = "Basic Monitored Fault-Tolerant";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BasicFT", description, policyParams);
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "BasicFT", description, policyParams);
     }
 
     // AdvancedFT
-    private static FeedPolicy initializeAdvancedFTPolicy() {
+    private static FeedPolicyEntity initializeAdvancedFTPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -118,11 +118,11 @@ public class BuiltinFeedPolicies {
         policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "true");
 
         String description = "Basic Monitored Fault-Tolerant with at least once semantics";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT", description, policyParams);
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT", description, policyParams);
     }
 
     // AdvancedFT_Discard
-    private static FeedPolicy initializeAdvancedFTDiscardPolicy() {
+    private static FeedPolicyEntity initializeAdvancedFTDiscardPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -133,14 +133,14 @@ public class BuiltinFeedPolicies {
         policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "100");
         policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
         policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
-       
+
         String description = "AdvancedFT 100% Discard during congestion";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Discard", description,
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Discard", description,
                 policyParams);
     }
 
     // AdvancedFT_Spill
-    private static FeedPolicy initializeAdvancedFTSpillPolicy() {
+    private static FeedPolicyEntity initializeAdvancedFTSpillPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -152,11 +152,11 @@ public class BuiltinFeedPolicies {
         policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "true");
 
         String description = "AdvancedFT 100% Discard during congestion";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Spill", description, policyParams);
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Spill", description, policyParams);
     }
 
     // AdvancedFT_Spill
-    private static FeedPolicy initializeAdvancedFTThrottlePolicy() {
+    private static FeedPolicyEntity initializeAdvancedFTThrottlePolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -169,12 +169,12 @@ public class BuiltinFeedPolicies {
         policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "true");
 
         String description = "AdvancedFT Throttle during congestion";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Throttle", description,
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Throttle", description,
                 policyParams);
     }
 
     // AdvancedFT_Elastic
-    private static FeedPolicy initializeAdvancedFTElasticPolicy() {
+    private static FeedPolicyEntity initializeAdvancedFTElasticPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
         policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
@@ -185,7 +185,7 @@ public class BuiltinFeedPolicies {
         policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
 
         String description = "Basic Monitored Fault-Tolerant Elastic";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Elastic", description,
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Elastic", description,
                 policyParams);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
deleted file mode 100644
index 30b369a..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class CollectTransformFeedFrameWriter implements IFeedOperatorOutputSideHandler {
-
-    private final FeedConnectionId connectionId;
-    private IFrameWriter downstreamWriter;
-    private final FrameTupleAccessor inputFrameTupleAccessor;
-    private final FrameTupleAppender tupleAppender;
-    private final IFrame frame;
-
-    private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
-
-    public CollectTransformFeedFrameWriter(IHyracksTaskContext ctx, IFrameWriter downstreamWriter,
-            ISubscribableRuntime sourceRuntime, RecordDescriptor outputRecordDescriptor, FeedConnectionId connectionId)
-            throws HyracksDataException {
-        this.downstreamWriter = downstreamWriter;
-        RecordDescriptor inputRecordDescriptor = sourceRuntime.getRecordDescriptor();
-        inputFrameTupleAccessor = new FrameTupleAccessor(inputRecordDescriptor);
-        tupleAppender = new FrameTupleAppender();
-        frame = new VSizeFrame(ctx);
-        tupleAppender.reset(frame, true);
-        this.connectionId = connectionId;
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        downstreamWriter.open();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        inputFrameTupleAccessor.reset(buffer);
-        int nTuple = inputFrameTupleAccessor.getTupleCount();
-        for (int t = 0; t < nTuple; t++) {
-            tupleBuilder.addField(inputFrameTupleAccessor, t, 0);
-            appendTupleToFrame();
-            tupleBuilder.reset();
-        }
-    }
-
-    private void appendTupleToFrame() throws HyracksDataException {
-        if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                tupleBuilder.getSize())) {
-            FrameUtils.flushFrame(frame.getBuffer(), downstreamWriter);
-            tupleAppender.reset(frame, true);
-            if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                    tupleBuilder.getSize())) {
-                throw new IllegalStateException();
-            }
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        downstreamWriter.fail();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        downstreamWriter.close();
-    }
-
-    @Override
-    public FeedId getFeedId() {
-        return connectionId.getFeedId();
-    }
-
-    @Override
-    public Type getType() {
-        return Type.COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER;
-    }
-
-    public IFrameWriter getDownstreamWriter() {
-        return downstreamWriter;
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public void reset(IFrameWriter writer) {
-        this.downstreamWriter = writer;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
deleted file mode 100644
index fee99d8..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/*
- * A single activity operator that provides the functionality of scanning data using an
- * instance of the configured adapter.
- */
-public class ExternalDataScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-
-    private IAdapterFactory adapterFactory;
-
-    public ExternalDataScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
-            IAdapterFactory dataSourceAdapterFactory) {
-        super(spec, 0, 1);
-        recordDescriptors[0] = rDesc;
-        this.adapterFactory = dataSourceAdapterFactory;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                    throws HyracksDataException {
-
-        return new AbstractUnaryOutputSourceOperatorNodePushable() {
-
-            @Override
-            public void initialize() throws HyracksDataException {
-                IDataSourceAdapter adapter = null;
-                try {
-                    writer.open();
-                    adapter = adapterFactory.createAdapter(ctx, partition);
-                    adapter.start(partition, writer);
-                } catch (Throwable th) {
-                    writer.fail();
-                    throw new HyracksDataException(th);
-                } finally {
-                    writer.close();
-                }
-            }
-        };
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedActivityIdFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedActivityIdFactory.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedActivityIdFactory.java
deleted file mode 100644
index a0a4af9..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedActivityIdFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class FeedActivityIdFactory {
-    private static AtomicInteger id = new AtomicInteger();
-    private static boolean isInitialized = false;
-
-    public static boolean isInitialized() {
-        return isInitialized;
-    }
-
-    public static void initialize(int initialId) {
-        id.set(initialId);
-        isInitialized = true;
-    }
-
-    public static int generateFeedActivityId() {
-        return id.incrementAndGet();
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
deleted file mode 100644
index 715b68b..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.IngestionRuntime;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * FeedCollectOperatorDescriptor is responsible for ingesting data from an external source. This
- * operator uses a user specified for a built-in adaptor for retrieving data from the external
- * data source.
- */
-public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(FeedCollectOperatorDescriptor.class.getName());
-
-    /** The type associated with the ADM data output from the feed adaptor */
-    private final IAType outputType;
-
-    /** unique identifier for a feed instance. */
-    private final FeedConnectionId connectionId;
-
-    /** Map representation of policy parameters */
-    private final Map<String, String> feedPolicyProperties;
-
-    /** The (singleton) instance of {@code IFeedIngestionManager} **/
-    private IFeedSubscriptionManager subscriptionManager;
-
-    /** The source feed from which the feed derives its data from. **/
-    private final FeedId sourceFeedId;
-
-    /** The subscription location at which the recipient feed receives tuples from the source feed **/
-    private final ConnectionLocation subscriptionLocation;
-
-    public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, FeedId sourceFeedId,
-            ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
-            ConnectionLocation subscriptionLocation) {
-        super(spec, 0, 1);
-        recordDescriptors[0] = rDesc;
-        this.outputType = atype;
-        this.connectionId = feedConnectionId;
-        this.feedPolicyProperties = feedPolicyProperties;
-        this.sourceFeedId = sourceFeedId;
-        this.subscriptionLocation = subscriptionLocation;
-    }
-
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-            throws HyracksDataException {
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        this.subscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
-        ISubscribableRuntime sourceRuntime = null;
-        IOperatorNodePushable nodePushable = null;
-        switch (subscriptionLocation) {
-            case SOURCE_FEED_INTAKE_STAGE:
-                try {
-                    SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
-                            FeedRuntimeType.INTAKE, partition);
-                    sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
-                    if (sourceRuntime == null) {
-                        throw new HyracksDataException("Source intake task not found for source feed id "
-                                + sourceFeedId);
-                    }
-                    nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
-                            feedPolicyProperties, partition, nPartitions, sourceRuntime);
-
-                } catch (Exception exception) {
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Initialization of the feed adaptor failed with exception " + exception);
-                    }
-                    throw new HyracksDataException("Initialization of the feed adapter failed", exception);
-                }
-                break;
-            case SOURCE_FEED_COMPUTE_STAGE:
-                SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
-                        FeedRuntimeType.COMPUTE, partition);
-                sourceRuntime = (ISubscribableRuntime) subscriptionManager
-                        .getSubscribableRuntime(feedSubscribableRuntimeId);
-                if (sourceRuntime == null) {
-                    throw new HyracksDataException("Source compute task not found for source feed id " + sourceFeedId
-                            + " " + FeedRuntimeType.COMPUTE + "[" + partition + "]");
-                }
-                nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
-                        feedPolicyProperties, partition, nPartitions, sourceRuntime);
-                break;
-        }
-        return nodePushable;
-    }
-
-    public FeedConnectionId getFeedConnectionId() {
-        return connectionId;
-    }
-
-    public Map<String, String> getFeedPolicyProperties() {
-        return feedPolicyProperties;
-    }
-
-    public IAType getOutputType() {
-        return outputType;
-    }
-
-    public RecordDescriptor getRecordDescriptor() {
-        return recordDescriptors[0];
-    }
-
-    public FeedId getSourceFeedId() {
-        return sourceFeedId;
-    }
-
-    private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
-        int waitCycleCount = 0;
-        ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
-        while (ingestionRuntime == null && waitCycleCount < 10) {
-            try {
-                Thread.sleep(2000);
-                waitCycleCount++;
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId);
-                }
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-                break;
-            }
-            ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
-        }
-        return (IngestionRuntime) ingestionRuntime;
-    }
-
-    public ConnectionLocation getSubscriptionLocation() {
-        return subscriptionLocation;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
deleted file mode 100644
index 8f9c8f3..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.CollectionRuntime;
-import org.apache.asterix.common.feeds.FeedCollectRuntimeInputHandler;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedFrameCollector.State;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/**
- * The runtime for @see{FeedIntakeOperationDescriptor}
- */
-public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-
-    private static Logger LOGGER = Logger.getLogger(FeedCollectOperatorNodePushable.class.getName());
-
-    private final int partition;
-    private final FeedConnectionId connectionId;
-    private final Map<String, String> feedPolicy;
-    private final FeedPolicyAccessor policyAccessor;
-    private final IFeedManager feedManager;
-    private final ISubscribableRuntime sourceRuntime;
-    private final IHyracksTaskContext ctx;
-    private final int nPartitions;
-
-    private RecordDescriptor outputRecordDescriptor;
-    private FeedRuntimeInputHandler inputSideHandler;
-    private CollectionRuntime collectRuntime;
-
-    public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedId sourceFeedId,
-            FeedConnectionId feedConnectionId, Map<String, String> feedPolicy, int partition, int nPartitions,
-            ISubscribableRuntime sourceRuntime) {
-        this.ctx = ctx;
-        this.partition = partition;
-        this.nPartitions = nPartitions;
-        this.connectionId = feedConnectionId;
-        this.sourceRuntime = sourceRuntime;
-        this.feedPolicy = feedPolicy;
-        policyAccessor = new FeedPolicyAccessor(feedPolicy);
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        this.feedManager = runtimeCtx.getFeedManager();
-    }
-
-    @Override
-    public void initialize() throws HyracksDataException {
-        try {
-            outputRecordDescriptor = recordDesc;
-            FeedRuntimeType sourceRuntimeType = ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId())
-                    .getFeedRuntimeType();
-            switch (sourceRuntimeType) {
-                case INTAKE:
-                    handleCompleteConnection();
-                    break;
-                case COMPUTE:
-                    handlePartialConnection();
-                    break;
-                default:
-                    throw new IllegalStateException("Invalid source type " + sourceRuntimeType);
-            }
-
-            State state = collectRuntime.waitTillCollectionOver();
-            if (state.equals(State.FINISHED)) {
-                feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
-                        collectRuntime.getRuntimeId());
-                writer.close();
-                inputSideHandler.close();
-            } else if (state.equals(State.HANDOVER)) {
-                inputSideHandler.setMode(Mode.STALL);
-                writer.close();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Ending Collect Operator, the input side handler is now in " + Mode.STALL
-                            + " and the output writer " + writer + " has been closed ");
-                }
-            }
-        } catch (InterruptedException ie) {
-            handleInterruptedException(ie);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private void handleCompleteConnection() throws Exception {
-        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COLLECT, partition,
-                FeedRuntimeId.DEFAULT_OPERAND_ID);
-        collectRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(connectionId,
-                runtimeId);
-        if (collectRuntime == null) {
-            beginNewFeed(runtimeId);
-        } else {
-            reviveOldFeed();
-        }
-    }
-
-    private void beginNewFeed(FeedRuntimeId runtimeId) throws Exception {
-        writer.open();
-        IFrameWriter outputSideWriter = writer;
-        if (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType().equals(
-                FeedRuntimeType.COMPUTE)) {
-            outputSideWriter = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime, outputRecordDescriptor,
-                    connectionId);
-            this.recordDesc = sourceRuntime.getRecordDescriptor();
-        }
-
-        FrameTupleAccessor tupleAccessor = new FrameTupleAccessor(recordDesc);
-        inputSideHandler = new FeedCollectRuntimeInputHandler(ctx, connectionId, runtimeId, outputSideWriter, policyAccessor,
-                false,  tupleAccessor, recordDesc,
-                feedManager, nPartitions);
-
-        collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, outputSideWriter,
-                sourceRuntime, feedPolicy);
-        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
-        sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
-    }
-
-    private void reviveOldFeed() throws HyracksDataException {
-        writer.open();
-        collectRuntime.getFrameCollector().setState(State.ACTIVE);
-        inputSideHandler = collectRuntime.getInputHandler();
-
-        IFrameWriter innerWriter = inputSideHandler.getCoreOperator();
-        if (innerWriter instanceof CollectTransformFeedFrameWriter) {
-            ((CollectTransformFeedFrameWriter) innerWriter).reset(this.writer);
-        } else {
-            inputSideHandler.setCoreOperator(writer);
-        }
-
-        inputSideHandler.setMode(Mode.PROCESS);
-    }
-
-    private void handlePartialConnection() throws Exception {
-        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COMPUTE_COLLECT, partition,
-                FeedRuntimeId.DEFAULT_OPERAND_ID);
-        writer.open();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Beginning new feed (from existing partial connection):" + connectionId);
-        }
-        IFeedOperatorOutputSideHandler wrapper = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime,
-                outputRecordDescriptor, connectionId);
-
-        inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, wrapper, policyAccessor, false,
-                 new FrameTupleAccessor(recordDesc), recordDesc, feedManager,
-                nPartitions);
-
-        collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, wrapper, sourceRuntime,
-                feedPolicy);
-        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
-        recordDesc = sourceRuntime.getRecordDescriptor();
-        sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
-    }
-
-    private void handleInterruptedException(InterruptedException ie) throws HyracksDataException {
-        if (policyAccessor.continueOnHardwareFailure()) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Continuing on failure as per feed policy, switching to " + Mode.STALL
-                        + " until failure is resolved");
-            }
-            inputSideHandler.setMode(Mode.STALL);
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Failure during feed ingestion. Deregistering feed runtime " + collectRuntime
-                        + " as feed is not configured to handle failures");
-            }
-            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
-            writer.close();
-            throw new HyracksDataException(ie);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedConnectionManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedConnectionManager.java
deleted file mode 100644
index 7356c94..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedConnectionManager.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeManager;
-import org.apache.asterix.common.feeds.api.IFeedConnectionManager;
-
-/**
- * An implementation of the IFeedManager interface.
- * Provider necessary central repository for registering/retrieving
- * artifacts/services associated with a feed.
- */
-public class FeedConnectionManager implements IFeedConnectionManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedConnectionManager.class.getName());
-
-    private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
-    private final String nodeId;
-
-    public FeedConnectionManager(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
-        return feedRuntimeManagers.get(feedId);
-    }
-
-    @Override
-    public void deregisterFeed(FeedConnectionId feedId) {
-        try {
-            FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
-            if (mgr != null) {
-                mgr.close();
-                feedRuntimeManagers.remove(feedId);
-            }
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
-            }
-        }
-
-    }
-
-    @Override
-    public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime)
-            throws Exception {
-        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
-        if (runtimeMgr == null) {
-            runtimeMgr = new FeedRuntimeManager(connectionId, this);
-            feedRuntimeManagers.put(connectionId, runtimeMgr);
-        }
-        runtimeMgr.registerFeedRuntime(feedRuntime.getRuntimeId(), feedRuntime);
-    }
-
-    @Override
-    public void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
-        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
-        if (runtimeMgr != null) {
-            runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
-        }
-    }
-
-    @Override
-    public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
-        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
-        return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
-    }
-
-    @Override
-    public String toString() {
-        return "FeedManager " + "[" + nodeId + "]";
-    }
-
-    @Override
-    public List<FeedRuntimeId> getRegisteredRuntimes() {
-        List<FeedRuntimeId> runtimes = new ArrayList<FeedRuntimeId>();
-        for (Entry<FeedConnectionId, FeedRuntimeManager> entry : feedRuntimeManagers.entrySet()) {
-            runtimes.addAll(entry.getValue().getFeedRuntimes());
-        }
-        return runtimes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedFrameTupleDecorator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedFrameTupleDecorator.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedFrameTupleDecorator.java
deleted file mode 100644
index 6ee14d8..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedFrameTupleDecorator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedConstants.StatisticsConstants;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-
-public class FeedFrameTupleDecorator {
-
-    private AMutableString aString = new AMutableString("");
-    private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AtomicInteger tupleId;
-
-    @SuppressWarnings("unchecked")
-    private static ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ASTRING);
-    @SuppressWarnings("unchecked")
-    private static ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT32);
-    @SuppressWarnings("unchecked")
-    private static ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT64);
-
-    private final int partition;
-    private final ArrayBackedValueStorage attrNameStorage;
-    private final ArrayBackedValueStorage attrValueStorage;
-
-    public FeedFrameTupleDecorator(int partition) {
-        this.tupleId = new AtomicInteger(0);
-        this.partition = partition;
-        this.attrNameStorage = new ArrayBackedValueStorage();
-        this.attrValueStorage = new ArrayBackedValueStorage();
-    }
-
-    public void addLongAttribute(String attrName, long attrValue, IARecordBuilder recordBuilder)
-            throws HyracksDataException, AsterixException {
-        attrNameStorage.reset();
-        aString.setValue(attrName);
-        stringSerde.serialize(aString, attrNameStorage.getDataOutput());
-
-        attrValueStorage.reset();
-        aInt64.setValue(attrValue);
-        int64Serde.serialize(aInt64, attrValueStorage.getDataOutput());
-
-        recordBuilder.addField(attrNameStorage, attrValueStorage);
-    }
-
-    public void addIntegerAttribute(String attrName, int attrValue, IARecordBuilder recordBuilder)
-            throws HyracksDataException, AsterixException {
-        attrNameStorage.reset();
-        aString.setValue(attrName);
-        stringSerde.serialize(aString, attrNameStorage.getDataOutput());
-
-        attrValueStorage.reset();
-        aInt32.setValue(attrValue);
-        int32Serde.serialize(aInt32, attrValueStorage.getDataOutput());
-
-        recordBuilder.addField(attrNameStorage, attrValueStorage);
-    }
-
-    public void addTupleId(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
-        addIntegerAttribute(StatisticsConstants.INTAKE_TUPLEID, tupleId.incrementAndGet(), recordBuilder);
-    }
-
-    public void addIntakePartition(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
-        addIntegerAttribute(StatisticsConstants.INTAKE_PARTITION, partition, recordBuilder);
-    }
-
-    public void addIntakeTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
-        addLongAttribute(StatisticsConstants.INTAKE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
-    }
-
-    public void addStoreTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
-        addLongAttribute(StatisticsConstants.STORE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
deleted file mode 100644
index 54c9af5..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.feeds.IngestionRuntime;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.library.ExternalLibraryManager;
-import org.apache.asterix.metadata.entities.PrimaryFeed;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * An operator responsible for establishing connection with external data source and parsing,
- * translating the received content.It uses an instance of feed adaptor to perform these functions.
- */
-public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
-
-    /** The unique identifier of the feed that is being ingested. **/
-    private final FeedId feedId;
-
-    private final FeedPolicyAccessor policyAccessor;
-
-    /** The adaptor factory that is used to create an instance of the feed adaptor **/
-    private IAdapterFactory adaptorFactory;
-
-    /** The library that contains the adapter in use. **/
-    private String adaptorLibraryName;
-
-    /**
-     * The adapter factory class that is used to create an instance of the feed adapter.
-     * This value is used only in the case of external adapters.
-     **/
-    private String adaptorFactoryClassName;
-
-    /** The configuration parameters associated with the adapter. **/
-    private Map<String, String> adaptorConfiguration;
-
-    private ARecordType adapterOutputType;
-
-    public FeedIntakeOperatorDescriptor(JobSpecification spec, PrimaryFeed primaryFeed, IAdapterFactory adapterFactory,
-            ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
-        super(spec, 0, 1);
-        this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
-        this.adaptorFactory = adapterFactory;
-        this.adapterOutputType = adapterOutputType;
-        this.policyAccessor = policyAccessor;
-    }
-
-    public FeedIntakeOperatorDescriptor(JobSpecification spec, PrimaryFeed primaryFeed, String adapterLibraryName,
-            String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
-        super(spec, 0, 1);
-        this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
-        this.adaptorFactoryClassName = adapterFactoryClassName;
-        this.adaptorLibraryName = adapterLibraryName;
-        this.adaptorConfiguration = primaryFeed.getAdaptorConfiguration();
-        this.adapterOutputType = adapterOutputType;
-        this.policyAccessor = policyAccessor;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        IFeedSubscriptionManager feedSubscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
-        SubscribableFeedRuntimeId feedIngestionId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
-                partition);
-        IngestionRuntime ingestionRuntime = (IngestionRuntime) feedSubscriptionManager
-                .getSubscribableRuntime(feedIngestionId);
-        if (adaptorFactory == null) {
-            try {
-                adaptorFactory = createExtenralAdapterFactory(ctx, partition);
-            } catch (Exception exception) {
-                throw new HyracksDataException(exception);
-            }
-
-        }
-        return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, ingestionRuntime,
-                policyAccessor);
-    }
-
-    private IAdapterFactory createExtenralAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception {
-        IAdapterFactory adapterFactory = null;
-        ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
-                adaptorLibraryName);
-        if (classLoader != null) {
-            adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance()));
-            adapterFactory.configure(adaptorConfiguration, adapterOutputType);
-        } else {
-            String message = "Unable to create adapter as class loader not configured for library " + adaptorLibraryName
-                    + " in dataverse " + feedId.getDataverse();
-            LOGGER.severe(message);
-            throw new IllegalArgumentException(message);
-        }
-        return adapterFactory;
-    }
-
-    public FeedId getFeedId() {
-        return feedId;
-    }
-
-}