You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:05 UTC

[08/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
new file mode 100644
index 0000000..bd1d75c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
@@ -0,0 +1,143 @@
+package edu.uci.ics.asterix.external.util;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import twitter4j.FilterQuery;
+import twitter4j.Twitter;
+import twitter4j.TwitterFactory;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+import twitter4j.conf.ConfigurationBuilder;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class TwitterUtil {
+
+    public static class ConfigurationConstants {
+        public static final String KEY_LOCATION = "location";
+        public static final String LOCATION_US = "US";
+    }
+
+    public static class GeoConstants {
+        public static Map<String, double[][]> boundingBoxes = initializeBoundingBoxes();
+        public static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } };
+    }
+
+    private static Map<String, double[][]> initializeBoundingBoxes() {
+        Map<String, double[][]> boundingBoxes = new HashMap<String, double[][]>();
+        boundingBoxes.put(ConfigurationConstants.LOCATION_US, new double[][] { { -124.848974, 24.396308 },
+                { -66.885444, 49.384358 } });
+        return boundingBoxes;
+    }
+
+    public static FilterQuery getFilterQuery(Map<String, String> configuration) throws AsterixException {
+        String locationValue = configuration.get(ConfigurationConstants.KEY_LOCATION);
+        double[][] locations = null;
+        if (locationValue != null) {
+            if (locationValue.contains(",")) {
+                String[] coordinatesString = locationValue.trim().split(",");
+                locations = new double[2][2];
+                for (int i = 0; i < 2; i++) {
+                    for (int j = 0; j < 2; j++) {
+                        try {
+                            locations[i][j] = Double.parseDouble(coordinatesString[2 * i + j]);
+                        } catch (NumberFormatException ne) {
+                            throw new AsterixException("Incorrect coordinate value " + coordinatesString[2 * i + j]);
+                        }
+                    }
+                }
+            } else {
+                locations = GeoConstants.boundingBoxes.get(locationValue);
+            }
+            if (locations != null) {
+                FilterQuery filterQuery = new FilterQuery();
+                filterQuery.locations(locations);
+                return filterQuery;
+            }
+        }
+        return null;
+
+    }
+
+    public static Twitter getTwitterService(Map<String, String> configuration) {
+        ConfigurationBuilder cb = getAuthConfiguration(configuration);
+        TwitterFactory tf = new TwitterFactory(cb.build());
+        Twitter twitter = tf.getInstance();
+        return twitter;
+    }
+
+    public static TwitterStream getTwitterStream(Map<String, String> configuration) {
+        ConfigurationBuilder cb = getAuthConfiguration(configuration);
+        TwitterStreamFactory factory = new TwitterStreamFactory(cb.build());
+        return factory.getInstance();
+    }
+
+    private static ConfigurationBuilder getAuthConfiguration(Map<String, String> configuration) {
+        ConfigurationBuilder cb = new ConfigurationBuilder();
+        cb.setDebugEnabled(true);
+        String oAuthConsumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+        String oAuthConsumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+        String oAuthAccessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+        String oAuthAccessTokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+
+        cb.setOAuthConsumerKey(oAuthConsumerKey);
+        cb.setOAuthConsumerSecret(oAuthConsumerSecret);
+        cb.setOAuthAccessToken(oAuthAccessToken);
+        cb.setOAuthAccessTokenSecret(oAuthAccessTokenSecret);
+        return cb;
+    }
+
+    public static void initializeConfigurationWithAuthInfo(Map<String, String> configuration) throws AsterixException {
+        String authMode = configuration.get(AuthenticationConstants.AUTHENTICATION_MODE);
+        if (authMode == null) {
+            authMode = AuthenticationConstants.AUTHENTICATION_MODE_FILE;
+        }
+        try {
+            switch (authMode) {
+                case AuthenticationConstants.AUTHENTICATION_MODE_FILE:
+                    Properties prop = new Properties();
+                    String authFile = configuration.get(AuthenticationConstants.OAUTH_AUTHENTICATION_FILE);
+                    if (authFile == null) {
+                        authFile = AuthenticationConstants.DEFAULT_AUTH_FILE;
+                    }
+                    InputStream in = TwitterUtil.class.getResourceAsStream(authFile);
+                    prop.load(in);
+                    in.close();
+                    configuration.put(AuthenticationConstants.OAUTH_CONSUMER_KEY,
+                            prop.getProperty(AuthenticationConstants.OAUTH_CONSUMER_KEY));
+                    configuration.put(AuthenticationConstants.OAUTH_CONSUMER_SECRET,
+                            prop.getProperty(AuthenticationConstants.OAUTH_CONSUMER_SECRET));
+                    configuration.put(AuthenticationConstants.OAUTH_ACCESS_TOKEN,
+                            prop.getProperty(AuthenticationConstants.OAUTH_ACCESS_TOKEN));
+                    configuration.put(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET,
+                            prop.getProperty(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET));
+                    break;
+                case AuthenticationConstants.AUTHENTICATION_MODE_EXPLICIT:
+                    break;
+            }
+        } catch (Exception e) {
+            throw new AsterixException("Incorrect configuration! unable to load authentication credentials "
+                    + e.getMessage());
+        }
+    }
+
+    public static final class AuthenticationConstants {
+        public static final String OAUTH_CONSUMER_KEY = "consumer.key";
+        public static final String OAUTH_CONSUMER_SECRET = "consumer.secret";
+        public static final String OAUTH_ACCESS_TOKEN = "access.token";
+        public static final String OAUTH_ACCESS_TOKEN_SECRET = "access.token.secret";
+        public static final String OAUTH_AUTHENTICATION_FILE = "authentication.file";
+        public static final String AUTHENTICATION_MODE = "authentication.mode";
+        public static final String AUTHENTICATION_MODE_FILE = "file";
+        public static final String AUTHENTICATION_MODE_EXPLICIT = "explicit";
+        public static final String DEFAULT_AUTH_FILE = "/feed/twitter/auth.properties"; // default authentication file
+    }
+
+    public static final class SearchAPIConstants {
+        public static final String QUERY = "query";
+        public static final String INTERVAL = "interval";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
new file mode 100644
index 0000000..90fddcd
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.IExternalScalarFunction;
+import edu.uci.ics.asterix.external.library.IFunctionFactory;
+
+public class AddHashTagsFactory implements IFunctionFactory {
+
+    @Override
+    public IExternalScalarFunction getExternalFunction() {
+        return new AddHashTagsFunction();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
new file mode 100644
index 0000000..93a87f5
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JDouble;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPoint;
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.library.java.JTypeTag;
+import edu.uci.ics.asterix.external.util.Datatypes;
+
+public class AddHashTagsFunction implements IExternalScalarFunction {
+
+    private JUnorderedList list = null;
+    private JPoint location = null;
+
+    @Override
+    public void initialize(IFunctionHelper functionHelper) {
+        list = new JUnorderedList(functionHelper.getObject(JTypeTag.STRING));
+        location = new JPoint(0, 0);
+    }
+
+    @Override
+    public void deinitialize() {
+    }
+
+    @Override
+    public void evaluate(IFunctionHelper functionHelper) throws Exception {
+        list.clear();
+        JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+        JString text = (JString) inputRecord.getValueByName(Datatypes.Tweet.MESSAGE);
+        JDouble latitude = (JDouble) inputRecord.getValueByName(Datatypes.Tweet.LATITUDE);
+        JDouble longitude = (JDouble) inputRecord.getValueByName(Datatypes.Tweet.LONGITUDE);
+
+        if (latitude != null && longitude != null) {
+            location.setValue(latitude.getValue(), longitude.getValue());
+        }
+        String[] tokens = text.getValue().split(" ");
+        for (String tk : tokens) {
+            if (tk.startsWith("#")) {
+                JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
+                newField.setValue(tk);
+                list.add(newField);
+            }
+        }
+
+        JRecord outputRecord = (JRecord) functionHelper.getResultObject();
+        outputRecord.setField(Datatypes.Tweet.ID, inputRecord.getValueByName(Datatypes.Tweet.ID));
+
+        JRecord userRecord = (JRecord) inputRecord.getValueByName(Datatypes.Tweet.USER);
+        outputRecord.setField(Datatypes.ProcessedTweet.USER_NAME,
+                userRecord.getValueByName(Datatypes.Tweet.SCREEN_NAME));
+
+        outputRecord.setField(Datatypes.ProcessedTweet.LOCATION, location);
+        outputRecord.setField(Datatypes.Tweet.CREATED_AT, inputRecord.getValueByName(Datatypes.Tweet.CREATED_AT));
+        outputRecord.setField(Datatypes.Tweet.MESSAGE, text);
+        outputRecord.setField(Datatypes.ProcessedTweet.TOPICS, list);
+
+        inputRecord.addField(Datatypes.ProcessedTweet.TOPICS, list);
+        functionHelper.setResult(outputRecord);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
new file mode 100644
index 0000000..a2bec69
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+
+public class AddHashTagsInPlaceFactory implements IFunctionFactory {
+
+    @Override
+    public IExternalScalarFunction getExternalFunction() {
+        return new AddHashTagsInPlaceFunction();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
new file mode 100644
index 0000000..a3f6f70
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
@@ -0,0 +1,54 @@
+/*  1
+ * Copyright 2009-2013 by The Regents of the University of California   2
+             * Licensed under the Apache License, Version 2.0 (the "License");  3
+ * you may not use this file except in compliance with the License. 4
+ * you may obtain a copy of the License from    5
+ *  6
+ *     http://www.apache.org/licenses/LICENSE-2.0   
+ *  
+ * Unless required by applicable law or agreed to in writing, software 
+ * distributed under the License is distributed on an "AS IS" BASIS,    
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and  
+ * limitations under the License.   
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.library.java.JTypeTag;
+import edu.uci.ics.asterix.external.util.Datatypes;
+
+public class AddHashTagsInPlaceFunction implements IExternalScalarFunction {
+
+    private JUnorderedList list = null;
+
+    @Override
+    public void initialize(IFunctionHelper functionHelper) {
+        list = new JUnorderedList(functionHelper.getObject(JTypeTag.STRING));
+    }
+
+    @Override
+    public void deinitialize() {
+    }
+
+    @Override
+    public void evaluate(IFunctionHelper functionHelper) throws Exception {
+        list.clear();
+        JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+        JString text = (JString) inputRecord.getValueByName(Datatypes.Tweet.MESSAGE);
+
+        String[] tokens = text.getValue().split(" ");
+        for (String tk : tokens) {
+            if (tk.startsWith("#")) {
+                JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
+                newField.setValue(tk);
+                list.add(newField);
+            }
+        }
+        inputRecord.addField(Datatypes.ProcessedTweet.TOPICS, list);
+        functionHelper.setResult(inputRecord);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
index 58995c2..ec04541 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
@@ -53,9 +53,6 @@ public class UpperCaseFunction implements IExternalScalarFunction {
         JRecord result = (JRecord) functionHelper.getResultObject();
         result.setField("id", id);
         result.setField("text", text);
-        JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
-        newField.setValue(text.getValue().substring(random.nextInt(text.getValue().length())));
-        result.addField("substring", newField);
         functionHelper.setResult(result);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
index 07f1a40..7a2597e 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
@@ -23,8 +23,9 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter.DataExchangeMode;
 import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -43,8 +44,8 @@ public class TestTypedAdapter extends StreamBasedAdapter implements IFeedAdapter
     private DummyGenerator generator;
 
     public TestTypedAdapter(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx,
-            Map<String, String> configuration) throws IOException {
-        super(parserFactory, sourceDatatype, ctx);
+            Map<String, String> configuration, int partition) throws IOException {
+        super(parserFactory, sourceDatatype, ctx, partition);
         pos = new PipedOutputStream();
         pis = new PipedInputStream(pos);
         this.configuration = configuration;
@@ -131,4 +132,9 @@ public class TestTypedAdapter extends StreamBasedAdapter implements IFeedAdapter
         generator.stop();
     }
 
+    @Override
+    public boolean handleException(Exception e) {
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
index b042e9c..5416ce2 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -17,19 +17,23 @@ package edu.uci.ics.asterix.external.library.adapter;
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory.SupportedOperation;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public class TestTypedAdapterFactory implements ITypedAdapterFactory {
+public class TestTypedAdapterFactory implements IFeedAdapterFactory {
 
     /**
      * 
@@ -38,7 +42,7 @@ public class TestTypedAdapterFactory implements ITypedAdapterFactory {
 
     public static final String NAME = "test_typed_adapter";
 
-    private static ARecordType adapterOutputType = initOutputType();
+    private ARecordType outputType;
 
     public static final String KEY_NUM_OUTPUT_RECORDS = "num_output_records";
 
@@ -50,7 +54,7 @@ public class TestTypedAdapterFactory implements ITypedAdapterFactory {
     }
 
     private static ARecordType initOutputType() {
-        String[] fieldNames = new String[] { "tweetid", "message-text" };
+        String[] fieldNames = new String[] { "id", "message-text" };
         IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, BuiltinType.ASTRING };
         ARecordType outputType = null;
         try {
@@ -67,29 +71,36 @@ public class TestTypedAdapterFactory implements ITypedAdapterFactory {
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.TYPED;
-    }
-
-    @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
         return new AlgebricksCountPartitionConstraint(1);
     }
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        ITupleParserFactory tupleParserFactory = new AdmSchemafullRecordParserFactory(adapterOutputType);
-        return new TestTypedAdapter(tupleParserFactory, adapterOutputType, ctx, configuration);
+        ITupleParserFactory tupleParserFactory = new AsterixTupleParserFactory(configuration, outputType,
+                InputDataFormat.ADM);
+        return new TestTypedAdapter(tupleParserFactory, outputType, ctx, configuration, partition);
     }
 
     @Override
     public ARecordType getAdapterOutputType() {
-        return adapterOutputType;
+        return outputType;
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         this.configuration = configuration;
+        this.outputType = outputType;
+    }
+
+    @Override
+    public boolean isRecordTrackingEnabled() {
+        return false;
+    }
+
+    @Override
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        return null;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/resources/library_descriptor.xml
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/resources/library_descriptor.xml b/asterix-external-data/src/test/resources/library_descriptor.xml
new file mode 100644
index 0000000..e35288f
--- /dev/null
+++ b/asterix-external-data/src/test/resources/library_descriptor.xml
@@ -0,0 +1,76 @@
+<externalLibrary xmlns="library">
+	<language>JAVA</language>
+	<libraryFunctions>
+		<libraryFunction>
+			<function_type>SCALAR</function_type>
+			<name>parseTweet</name>
+			<arguments>TweetInputType</arguments>
+			<return_type>TweetOutputType</return_type>
+			<definition>edu.uci.ics.asterix.external.library.ParseTweetFactory
+			</definition>
+		</libraryFunction>
+		<libraryFunction>
+			<function_type>SCALAR</function_type>
+			<name>addHashTags</name>
+			<arguments>Tweet</arguments>
+			<return_type>ProcessedTweet</return_type>
+			<definition>edu.uci.ics.asterix.external.library.AddHashTagsFactory
+			</definition>
+		</libraryFunction>
+		<libraryFunction>
+			<function_type>SCALAR</function_type>
+			<name>addHashTagsInPlace</name>
+			<arguments>Tweet</arguments>
+			<return_type>ProcessedTweet</return_type>
+			<definition>edu.uci.ics.asterix.external.library.AddHashTagsInPlaceFactory
+			</definition>
+		</libraryFunction>
+		<libraryFunction>
+			<function_type>SCALAR</function_type>
+			<name>mysum</name>
+			<arguments>AINT32,AINT32</arguments>
+			<return_type>AINT32</return_type>
+			<definition>edu.uci.ics.asterix.external.library.SumFactory
+			</definition>
+		</libraryFunction>
+		<libraryFunction>
+			<function_type>SCALAR</function_type>
+			<name>getCapital</name>
+			<arguments>ASTRING</arguments>
+			<return_type>CountryCapitalType</return_type>
+			<definition>edu.uci.ics.asterix.external.library.CapitalFinderFactory
+			</definition>
+		</libraryFunction>
+		<libraryFunction>
+			<function_type>SCALAR</function_type>
+			<name>toUpper</name>
+			<arguments>TextType</arguments>
+			<return_type>TextType</return_type>
+			<definition>edu.uci.ics.asterix.external.library.UpperCaseFactory
+			</definition>
+		</libraryFunction>
+		<libraryFunction>
+			<function_type>SCALAR</function_type>
+			<name>allTypes</name>
+			<arguments>AllType</arguments>
+			<return_type>AllType</return_type>
+			<definition>edu.uci.ics.asterix.external.library.AllTypesFactory
+			</definition>
+		</libraryFunction>
+		<libraryFunction>
+			<function_type>SCALAR</function_type>
+			<name>echoDelay</name>
+			<arguments>TweetMessageType</arguments>
+			<return_type>TweetMessageType</return_type>
+			<definition>edu.uci.ics.asterix.external.library.EchoDelayFactory
+			</definition>
+		</libraryFunction>
+	</libraryFunctions>
+	<libraryAdapters>
+		<libraryAdapter>
+			<name>test_typed_adapter</name>
+			<factory_class>edu.uci.ics.asterix.external.library.adapter.TestTypedAdapterFactory
+			</factory_class>
+		</libraryAdapter>
+	</libraryAdapters>
+</externalLibrary>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/resources/text_functions.xml
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/text_functions.xml
deleted file mode 100644
index 8c7a92c..0000000
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<externalLibrary xmlns="library">
-	<language>JAVA</language>
-	<libraryFunctions>
-		<libraryFunction>
-			<function_type>SCALAR</function_type>
-			<name>parseTweet</name>
-			<arguments>TweetInputType</arguments>
-			<return_type>TweetOutputType</return_type>
-			<definition>edu.uci.ics.asterix.external.library.ParseTweetFactory
-			</definition>
-		</libraryFunction>
-		<libraryFunction>
-			<function_type>SCALAR</function_type>
-			<name>mysum</name>
-			<arguments>AINT32,AINT32</arguments>
-			<return_type>AINT32</return_type>
-			<definition>edu.uci.ics.asterix.external.library.SumFactory
-			</definition>
-		</libraryFunction>
-		<libraryFunction>
-			<function_type>SCALAR</function_type>
-			<name>getCapital</name>
-			<arguments>ASTRING</arguments>
-			<return_type>CountryCapitalType</return_type>
-			<definition>edu.uci.ics.asterix.external.library.CapitalFinderFactory
-			</definition>
-		</libraryFunction>
-		<libraryFunction>
-			<function_type>SCALAR</function_type>
-			<name>toUpper</name>
-			<arguments>TextType</arguments>
-			<return_type>TextType</return_type>
-			<definition>edu.uci.ics.asterix.external.library.UpperCaseFactory
-			</definition>
-		</libraryFunction>
-		<libraryFunction>
-			<function_type>SCALAR</function_type>
-			<name>allTypes</name>
-			<arguments>AllType</arguments>
-			<return_type>AllType</return_type>
-			<definition>edu.uci.ics.asterix.external.library.AllTypesFactory
-			</definition>
-		</libraryFunction>
-		<libraryFunction>
-			<function_type>SCALAR</function_type>
-			<name>echoDelay</name>
-			<arguments>TweetMessageType</arguments>
-			<return_type>TweetMessageType</return_type>
-			<definition>edu.uci.ics.asterix.external.library.EchoDelayFactory
-			</definition>
-		</libraryFunction>
-	</libraryFunctions>
-	<libraryAdapters>
-		<libraryAdapter>
-			<name>test_typed_adapter</name>
-			<factory_class>edu.uci.ics.asterix.external.library.adapter.TestTypedAdapterFactory</factory_class>
-		</libraryAdapter>
-	</libraryAdapters>
-</externalLibrary>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixFaultToleranceIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixFaultToleranceIT.java b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixFaultToleranceIT.java
deleted file mode 100644
index c532de5..0000000
--- a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixFaultToleranceIT.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.installer.test;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.logging.Logger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runners.Parameterized.Parameters;
-
-import edu.uci.ics.asterix.test.aql.TestsUtils;
-import edu.uci.ics.asterix.testframework.context.TestCaseContext;
-
-public class AsterixFaultToleranceIT {
-
-    private static final String PATH_BASE = "src/test/resources/integrationts/fault-tolerance";
-    private static final String PATH_ACTUAL = "ittest/";
-    private static final Logger LOGGER = Logger.getLogger(AsterixFaultToleranceIT.class.getName());
-    private static List<TestCaseContext> testCaseCollection;
-
-    @BeforeClass
-    public static void setUp() throws Exception {
-        AsterixInstallerIntegrationUtil.init();
-        TestCaseContext.Builder b = new TestCaseContext.Builder();
-        testCaseCollection = b.build(new File(PATH_BASE));
-        File outdir = new File(PATH_ACTUAL);
-        outdir.mkdirs();
-    }
-
-    @AfterClass
-    public static void tearDown() throws Exception {
-        AsterixInstallerIntegrationUtil.deinit();
-        File outdir = new File(PATH_ACTUAL);
-        File[] files = outdir.listFiles();
-        if (files == null || files.length == 0) {
-            outdir.delete();
-        }
-    }
-
-    @Parameters
-    public static Collection<Object[]> tests() throws Exception {
-        Collection<Object[]> testArgs = new ArrayList<Object[]>();
-        return testArgs;
-    }
-
-    @Test
-    public void test() throws Exception {
-        for (TestCaseContext testCaseCtx : testCaseCollection) {
-            TestsUtils.executeTest(PATH_ACTUAL, testCaseCtx, null, false);
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        try {
-            setUp();
-            new AsterixFaultToleranceIT().test();
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.info("TEST CASE(S) FAILED");
-        } finally {
-            tearDown();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql
deleted file mode 100644
index 1f0678e..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql
+++ /dev/null
@@ -1,28 +0,0 @@
-drop dataverse feeds if exists;
-create dataverse feeds;
-use dataverse feeds;
-
-create type TwitterUserType as closed {
-	screen-name: string,
-	lang: string,
-	friends_count: int32,
-	statuses_count: int32,
-	name: string,
-	followers_count: int32
-} 
-
-create type TweetMessageType as closed {
-	tweetid: int64,
-        user: TwitterUserType,
-        sender-location: point,
-	send-time: datetime,
-        referred-topics: {{ string }},
-	message-text: string
-}
-
-create dataset Tweets(TweetMessageType)
-primary key tweetid;
-
-create feed  TwitterFirehose
-using twitter_firehose
-(("duration"="30"),("tps"="50"),("tput-duration"="5"),("mode"="controlled"));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.2.update.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.2.update.aql
deleted file mode 100644
index 64dbf25..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.2.update.aql
+++ /dev/null
@@ -1,3 +0,0 @@
-use dataverse feeds;
-
-connect feed TwitterFirehose to dataset Tweets;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.sleep.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.sleep.aql
deleted file mode 100644
index 5caff40..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.sleep.aql
+++ /dev/null
@@ -1 +0,0 @@
-10000

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql
deleted file mode 100644
index 2d8a23e..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql
+++ /dev/null
@@ -1 +0,0 @@
-stop -n asterix

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.mgx.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.mgx.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.mgx.aql
deleted file mode 100644
index 4e99f33..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.mgx.aql
+++ /dev/null
@@ -1 +0,0 @@
-start -n asterix

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.sleep.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.sleep.aql
deleted file mode 100644
index c5da56a..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.sleep.aql
+++ /dev/null
@@ -1 +0,0 @@
-40000

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.7.query.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.7.query.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.7.query.aql
deleted file mode 100644
index d03b9fe..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.7.query.aql
+++ /dev/null
@@ -1,10 +0,0 @@
-use dataverse feeds;
-
-let $numTuples:=count(for $x in dataset Tweets
-return $x)
-let $result:=if($numTuples > 225)
-then 
- 1
-else
- 0
-return $result

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.1.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.1.adm b/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.1.adm
deleted file mode 100644
index 3c13d5f..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.1.adm
+++ /dev/null
@@ -1,2 +0,0 @@
-[ 1
- ]

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.2.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.2.adm b/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.2.adm
deleted file mode 100644
index 3c13d5f..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.2.adm
+++ /dev/null
@@ -1,2 +0,0 @@
-[ 1
- ]

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml b/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml
deleted file mode 100644
index 0d9ed23..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml
+++ /dev/null
@@ -1,10 +0,0 @@
-<test-suite xmlns="urn:xml.testframework.asterix.ics.uci.edu" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
-  <test-group name="fault-tolerance">
-    <test-case FilePath="feeds">
-      <compilation-unit name="IN1-cluster-restart">
-        <output-dir compare="Text">IN1-cluster-restart</output-dir>
-      </compilation-unit>
-    </test-case>
-  </test-group>
-</test-suite>
-

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm b/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm
index b3bf441..a9bcf09 100644
--- a/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm
+++ b/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm
@@ -1,7 +1,9 @@
-[ { "DataverseName": "externallibtest", "Name": "testlib#allTypes", "Arity": "1", "Params": [ "AllType" ], "ReturnType": "AllType", "Definition": "edu.uci.ics.asterix.external.library.AllTypesFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
-, { "DataverseName": "externallibtest", "Name": "testlib#echoDelay", "Arity": "1", "Params": [ "TweetMessageType" ], "ReturnType": "TweetMessageType", "Definition": "edu.uci.ics.asterix.external.library.EchoDelayFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
-, { "DataverseName": "externallibtest", "Name": "testlib#getCapital", "Arity": "1", "Params": [ "ASTRING" ], "ReturnType": "CountryCapitalType", "Definition": "edu.uci.ics.asterix.external.library.CapitalFinderFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
-, { "DataverseName": "externallibtest", "Name": "testlib#mysum", "Arity": "2", "Params": [ "AINT32", "AINT32" ], "ReturnType": "AINT32", "Definition": "edu.uci.ics.asterix.external.library.SumFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
-, { "DataverseName": "externallibtest", "Name": "testlib#parseTweet", "Arity": "1", "Params": [ "TweetInputType" ], "ReturnType": "TweetOutputType", "Definition": "edu.uci.ics.asterix.external.library.ParseTweetFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
-, { "DataverseName": "externallibtest", "Name": "testlib#toUpper", "Arity": "1", "Params": [ "TextType" ], "ReturnType": "TextType", "Definition": "edu.uci.ics.asterix.external.library.UpperCaseFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
+[ { "DataverseName": "externallibtest", "Name": "testlib#addHashTags", "Arity": "1", "Params": [ "Tweet" ], "ReturnType": "ProcessedTweet", "Definition": "edu.uci.ics.asterix.external.library.AddHashTagsFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#addHashTagsInPlace", "Arity": "1", "Params": [ "Tweet" ], "ReturnType": "ProcessedTweet", "Definition": "edu.uci.ics.asterix.external.library.AddHashTagsInPlaceFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#allTypes", "Arity": "1", "Params": [ "AllType" ], "ReturnType": "AllType", "Definition": "edu.uci.ics.asterix.external.library.AllTypesFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#echoDelay", "Arity": "1", "Params": [ "TweetMessageType" ], "ReturnType": "TweetMessageType", "Definition": "edu.uci.ics.asterix.external.library.EchoDelayFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#getCapital", "Arity": "1", "Params": [ "ASTRING" ], "ReturnType": "CountryCapitalType", "Definition": "edu.uci.ics.asterix.external.library.CapitalFinderFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#mysum", "Arity": "2", "Params": [ "AINT32", "AINT32" ], "ReturnType": "AINT32", "Definition": "edu.uci.ics.asterix.external.library.SumFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#parseTweet", "Arity": "1", "Params": [ "TweetInputType" ], "ReturnType": "TweetOutputType", "Definition": "edu.uci.ics.asterix.external.library.ParseTweetFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#toUpper", "Arity": "1", "Params": [ "TextType" ], "ReturnType": "TextType", "Definition": "edu.uci.ics.asterix.external.library.UpperCaseFactory", "Language": "JAVA", "Kind": "SCALAR" }
  ]

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
index ef21a16..be9ba0e 100644
--- a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
+++ b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
@@ -39,7 +39,7 @@
     </test-case>
   </test-group>
   <test-group name="library-feeds">
-    <test-case FilePath="library-feeds" category="slow">
+    <test-case FilePath="library-feeds">
       <compilation-unit name="feed_ingest">
         <output-dir compare="Text">feed_ingest</output-dir>
       </compilation-unit>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
index 6eb60f0..12e50dc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
@@ -23,7 +23,6 @@ import java.util.Map;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
 import edu.uci.ics.asterix.metadata.entities.CompactionPolicy;
@@ -32,7 +31,6 @@ import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
@@ -64,9 +62,7 @@ public class MetadataCache {
     protected final Map<FunctionSignature, Function> functions = new HashMap<FunctionSignature, Function>();
     // Key is adapter dataverse. Key of value map is the adapter name  
     protected final Map<String, Map<String, DatasourceAdapter>> adapters = new HashMap<String, Map<String, DatasourceAdapter>>();
-    // Key is FeedId   
-    protected final Map<FeedConnectionId, FeedActivity> feedActivity = new HashMap<FeedConnectionId, FeedActivity>();
-
+  
     // Key is DataverseName, Key of the value map is the Policy name   
     protected final Map<String, Map<String, FeedPolicy>> feedPolicies = new HashMap<String, Map<String, FeedPolicy>>();
     // Key is library dataverse. Key of value map is the library name
@@ -110,7 +106,6 @@ public class MetadataCache {
                         synchronized (datatypes) {
                             synchronized (functions) {
                                 synchronized (adapters) {
-                                    synchronized (feedActivity) {
                                         synchronized (libraries) {
                                             synchronized (compactionPolicies) {
                                                 dataverses.clear();
@@ -120,7 +115,6 @@ public class MetadataCache {
                                                 datatypes.clear();
                                                 functions.clear();
                                                 adapters.clear();
-                                                feedActivity.clear();
                                                 libraries.clear();
                                                 compactionPolicies.clear();
                                             }
@@ -133,7 +127,7 @@ public class MetadataCache {
                 }
             }
         }
-    }
+    
 
     public Object addDataverseIfNotExists(Dataverse dataverse) {
         synchronized (dataverses) {
@@ -240,7 +234,6 @@ public class MetadataCache {
                         synchronized (functions) {
                             synchronized (adapters) {
                                 synchronized (libraries) {
-                                    synchronized (feedActivity) {
                                         synchronized (feeds) {
                                             synchronized (compactionPolicies) {
                                                 datasets.remove(dataverse.getDataverseName());
@@ -257,19 +250,8 @@ public class MetadataCache {
                                                 for (FunctionSignature signature : markedFunctionsForRemoval) {
                                                     functions.remove(signature);
                                                 }
-                                                List<FeedConnectionId> feedActivitiesMarkedForRemoval = new ArrayList<FeedConnectionId>();
-                                                for (FeedConnectionId fid : feedActivity.keySet()) {
-                                                    if (fid.getDataverse().equals(dataverse.getDataverseName())) {
-                                                        feedActivitiesMarkedForRemoval.add(fid);
-                                                    }
-                                                }
-                                                for (FeedConnectionId fid : feedActivitiesMarkedForRemoval) {
-                                                    feedActivity.remove(fid);
-                                                }
-
                                                 libraries.remove(dataverse.getDataverseName());
                                                 feeds.remove(dataverse.getDataverseName());
-
                                                 return dataverses.remove(dataverse.getDataverseName());
                                             }
                                         }
@@ -281,7 +263,7 @@ public class MetadataCache {
                 }
             }
         }
-    }
+    
 
     public Object dropDataset(Dataset dataset) {
         synchronized (datasets) {
@@ -503,9 +485,9 @@ public class MetadataCache {
                 adaptersInDataverse = new HashMap<String, DatasourceAdapter>();
                 adapters.put(adapter.getAdapterIdentifier().getNamespace(), adaptersInDataverse);
             }
-            DatasourceAdapter adapterObject = adaptersInDataverse.get(adapter.getAdapterIdentifier().getAdapterName());
+            DatasourceAdapter adapterObject = adaptersInDataverse.get(adapter.getAdapterIdentifier().getName());
             if (adapterObject == null) {
-                return adaptersInDataverse.put(adapter.getAdapterIdentifier().getAdapterName(), adapter);
+                return adaptersInDataverse.put(adapter.getAdapterIdentifier().getName(), adapter);
             }
             return null;
         }
@@ -516,28 +498,14 @@ public class MetadataCache {
             Map<String, DatasourceAdapter> adaptersInDataverse = adapters.get(adapter.getAdapterIdentifier()
                     .getNamespace());
             if (adaptersInDataverse != null) {
-                return adaptersInDataverse.remove(adapter.getAdapterIdentifier().getAdapterName());
+                return adaptersInDataverse.remove(adapter.getAdapterIdentifier().getName());
             }
             return null;
         }
     }
 
-    public Object addFeedActivityIfNotExists(FeedActivity fa) {
-        synchronized (feedActivity) {
-            FeedConnectionId fid = new FeedConnectionId(fa.getDataverseName(), fa.getFeedName(), fa.getDatasetName());
-            if (!feedActivity.containsKey(fid)) {
-                feedActivity.put(fid, fa);
-            }
-        }
-        return null;
-    }
+  
 
-    public Object dropFeedActivity(FeedActivity fa) {
-        synchronized (feedActivity) {
-            FeedConnectionId fid = new FeedConnectionId(fa.getDataverseName(), fa.getFeedName(), fa.getDatasetName());
-            return feedActivity.remove(fid);
-        }
-    }
 
     public Object addLibraryIfNotExists(Library library) {
         synchronized (libraries) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index dbfe32c..4842934 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -24,7 +24,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
@@ -37,8 +36,6 @@ import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
@@ -717,29 +714,7 @@ public class MetadataManager implements IMetadataManager {
         }
         return adapter;
     }
-
-    @Override
-    public void registerFeedActivity(MetadataTransactionContext ctx, FeedConnectionId feedId, FeedActivity feedActivity)
-            throws MetadataException {
-        try {
-            metadataNode.registerFeedActivity(ctx.getJobId(), feedId, feedActivity);
-        } catch (RemoteException e) {
-            throw new MetadataException(e);
-        }
-    }
-
-    @Override
-    public FeedActivity getRecentActivityOnFeedConnection(MetadataTransactionContext ctx, FeedConnectionId feedId,
-            FeedActivityType... feedActivityTypes) throws MetadataException {
-
-        FeedActivity feedActivity = null;
-        try {
-            feedActivity = metadataNode.getRecentFeedActivity(ctx.getJobId(), feedId, feedActivityTypes);
-        } catch (RemoteException e) {
-            throw new MetadataException(e);
-        }
-        return feedActivity;
-    }
+  
 
     @Override
     public void dropLibrary(MetadataTransactionContext ctx, String dataverseName, String libraryName)
@@ -822,18 +797,7 @@ public class MetadataManager implements IMetadataManager {
         }
         return FeedPolicy;
     }
-
-    @Override
-    public List<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx, String dataverse, String dataset)
-            throws MetadataException {
-        List<FeedActivity> feedActivities = null;
-        try {
-            feedActivities = metadataNode.getActiveFeeds(ctx.getJobId(), dataverse, dataset);
-        } catch (RemoteException e) {
-            throw new MetadataException(e);
-        }
-        return feedActivities;
-    }
+   
 
     @Override
     public Feed getFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException {
@@ -848,12 +812,14 @@ public class MetadataManager implements IMetadataManager {
 
     @Override
     public void dropFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException {
+        Feed feed = null;
         try {
+            feed = metadataNode.getFeed(ctx.getJobId(), dataverse, feedName);
             metadataNode.dropFeed(ctx.getJobId(), dataverse, feedName);
         } catch (RemoteException e) {
             throw new MetadataException(e);
         }
-        ctx.dropFeed(dataverse, feedName);
+        ctx.dropFeed(feed);
     }
 
     @Override
@@ -866,28 +832,41 @@ public class MetadataManager implements IMetadataManager {
         ctx.addFeed(feed);
     }
 
-    @Override
-    public List<FeedActivity> getConnectFeedActivitiesForFeed(MetadataTransactionContext ctx, String dataverse,
-            String feedName) throws MetadataException {
-        List<FeedActivity> feedActivities = null;
+  
+    public List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext mdTxnCtx, String dataverse)
+            throws MetadataException {
+        List<DatasourceAdapter> dataverseAdapters;
         try {
-            feedActivities = metadataNode.getDatasetsServedByFeed(ctx.getJobId(), dataverse, feedName);
+            dataverseAdapters = metadataNode.getDataverseAdapters(mdTxnCtx.getJobId(), dataverse);
         } catch (RemoteException e) {
             throw new MetadataException(e);
         }
-        return feedActivities;
+        return dataverseAdapters;
     }
-
-    public List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext mdTxnCtx, String dataverse)
+    
+    public void dropFeedPolicy(MetadataTransactionContext mdTxnCtx, String dataverseName, String policyName)
             throws MetadataException {
-        List<DatasourceAdapter> dataverseAdapters;
+        FeedPolicy feedPolicy = null;
         try {
-            dataverseAdapters = metadataNode.getDataverseAdapters(mdTxnCtx.getJobId(), dataverse);
+            feedPolicy = metadataNode.getFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName);
+            metadataNode.dropFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName);
         } catch (RemoteException e) {
             throw new MetadataException(e);
         }
-        return dataverseAdapters;
+        mdTxnCtx.dropFeedPolicy(feedPolicy);
     }
+    
+    public List<FeedPolicy> getDataversePolicies(MetadataTransactionContext mdTxnCtx, String dataverse)
+            throws MetadataException {
+        List<FeedPolicy> dataverseFeedPolicies;
+        try {
+            dataverseFeedPolicies = metadataNode.getDataversePolicies(mdTxnCtx.getJobId(), dataverse);
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+        return dataverseFeedPolicies;
+    }
+
 
     @Override
     public List<ExternalFile> getDatasetExternalFiles(MetadataTransactionContext mdTxnCtx, Dataset dataset)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index a593348..0ff5455 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -17,19 +17,13 @@ package edu.uci.ics.asterix.metadata;
 
 import java.rmi.RemoteException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
 import edu.uci.ics.asterix.common.transactions.DatasetId;
@@ -51,8 +45,6 @@ import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
@@ -66,7 +58,6 @@ import edu.uci.ics.asterix.metadata.entitytupletranslators.DatasourceAdapterTupl
 import edu.uci.ics.asterix.metadata.entitytupletranslators.DatatypeTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.DataverseTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.ExternalFileTupleTranslator;
-import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedActivityTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedPolicyTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.FunctionTupleTranslator;
@@ -74,7 +65,6 @@ import edu.uci.ics.asterix.metadata.entitytupletranslators.IndexTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.LibraryTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeGroupTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeTupleTranslator;
-import edu.uci.ics.asterix.metadata.feeds.FeedActivityIdFactory;
 import edu.uci.ics.asterix.metadata.valueextractors.DatasetNameValueExtractor;
 import edu.uci.ics.asterix.metadata.valueextractors.DatatypeNameValueExtractor;
 import edu.uci.ics.asterix.metadata.valueextractors.MetadataEntityValueExtractor;
@@ -374,7 +364,7 @@ public class MetadataNode implements IMetadataNode {
             if (dataverseAdapters != null && dataverseAdapters.size() > 0) {
                 // Drop all functions in this dataverse.
                 for (DatasourceAdapter adapter : dataverseAdapters) {
-                    dropAdapter(jobId, dataverseName, adapter.getAdapterIdentifier().getAdapterName());
+                    dropAdapter(jobId, dataverseName, adapter.getAdapterIdentifier().getName());
                 }
             }
 
@@ -388,6 +378,14 @@ public class MetadataNode implements IMetadataNode {
                     dropFeed(jobId, dataverseName, feed.getFeedName());
                 }
             }
+            
+            List<FeedPolicy> feedPolicies = getDataversePolicies(jobId, dataverseName);
+            if (feedPolicies != null && feedPolicies.size() > 0) {
+                // Drop all feed ingestion policies in this dataverse.
+                for (FeedPolicy feedPolicy : feedPolicies) {
+                    dropFeedPolicy(jobId, dataverseName, feedPolicy.getPolicyName());
+                }
+            }
 
             // Delete the dataverse entry from the 'dataverse' dataset.
             ITupleReference searchKey = createTuple(dataverseName);
@@ -1152,7 +1150,7 @@ public class MetadataNode implements IMetadataNode {
             insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, adapterTuple);
 
         } catch (TreeIndexDuplicateKeyException e) {
-            throw new MetadataException("A adapter with this name " + adapter.getAdapterIdentifier().getAdapterName()
+            throw new MetadataException("A adapter with this name " + adapter.getAdapterIdentifier().getName()
                     + " already exists in dataverse '" + adapter.getAdapterIdentifier().getNamespace() + "'.", e);
         } catch (Exception e) {
             throw new MetadataException(e);
@@ -1333,76 +1331,7 @@ public class MetadataNode implements IMetadataNode {
         return DatasetIdFactory.getMostRecentDatasetId();
     }
 
-    @Override
-    public void registerFeedActivity(JobId jobId, FeedConnectionId feedId, FeedActivity feedActivity)
-            throws MetadataException, RemoteException {
-        try {
-            if (!FeedActivityIdFactory.isInitialized()) {
-                initializeFeedActivityIdFactory(jobId);
-            }
-            feedActivity.setActivityId(FeedActivityIdFactory.generateFeedActivityId());
-            FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
-            ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(feedActivity);
-            insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, tuple);
-        } catch (Exception e) {
-            throw new MetadataException(e);
-        }
-
-    }
-
-    @Override
-    public FeedActivity getRecentFeedActivity(JobId jobId, FeedConnectionId feedId, FeedActivityType... activityType)
-            throws MetadataException, RemoteException {
-        try {
-            ITupleReference searchKey = createTuple(feedId.getDataverse(), feedId.getFeedName(),
-                    feedId.getDatasetName());
-            FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(false);
-            List<FeedActivity> results = new ArrayList<FeedActivity>();
-            IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
-                    tupleReaderWriter);
-            searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
-            if (!results.isEmpty()) {
-                Collections.sort(results);
-                if (activityType == null) {
-                    return results.get(0);
-                } else {
-                    for (FeedActivity result : results) {
-                        for (FeedActivityType ft : activityType) {
-                            if (result.getActivityType().equals(ft)) {
-                                return result;
-                            }
-                        }
-                    }
-                }
-            }
-            return null;
-        } catch (Exception e) {
-            throw new MetadataException(e);
-        }
-    }
-
-    @Override
-    public void initializeFeedActivityIdFactory(JobId jobId) throws MetadataException, RemoteException {
-        try {
-            ITupleReference searchKey = createTuple();
-            FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
-            List<FeedActivity> results = new ArrayList<FeedActivity>();
-            IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
-                    tupleReaderWriter);
-            searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
-            int maxActivityId = 0;
-            for (FeedActivity fa : results) {
-                if (maxActivityId < fa.getActivityId()) {
-                    maxActivityId = fa.getActivityId();
-                }
-            }
-            FeedActivityIdFactory.initialize(maxActivityId);
-        } catch (Exception e) {
-            throw new MetadataException(e);
-        }
-
-    }
-
+ 
     @Override
     public void addFeedPolicy(JobId jobId, FeedPolicy feedPolicy) throws MetadataException, RemoteException {
         try {
@@ -1439,57 +1368,7 @@ public class MetadataNode implements IMetadataNode {
         }
     }
 
-    @Override
-    public List<FeedActivity> getActiveFeeds(JobId jobId, String dataverse, String dataset) throws MetadataException,
-            RemoteException {
-        List<FeedActivity> activeFeeds = new ArrayList<FeedActivity>();
-        Map<FeedConnectionId, FeedActivity> aFeeds = new HashMap<FeedConnectionId, FeedActivity>();
-        boolean invalidArgs = (dataverse == null && dataset != null);
-        if (invalidArgs) {
-            throw new MetadataException("Invalid arguments " + dataverse + " " + dataset);
-        }
-        try {
-            ITupleReference searchKey = createTuple();
-            FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
-            List<FeedActivity> results = new ArrayList<FeedActivity>();
-            IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
-                    tupleReaderWriter);
-            searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
-            Collections.sort(results); // recent activity first
-            FeedConnectionId fid = null;
-            Set<FeedConnectionId> terminatedFeeds = new HashSet<FeedConnectionId>();
-            for (FeedActivity fa : results) {
-                if (dataverse != null) {
-                    if (dataset != null
-                            && (!fa.getDataverseName().equals(dataverse) || !dataset.equals(fa.getDatasetName()))) {
-                        continue;
-                    }
-                }
-
-                fid = new FeedConnectionId(fa.getDataverseName(), fa.getFeedName(), fa.getDatasetName());
-                switch (fa.getActivityType()) {
-                    case FEED_BEGIN:
-                        if (!terminatedFeeds.contains(fid)) {
-                            if (aFeeds.get(fid) == null || fa.getActivityId() > aFeeds.get(fid).getActivityId()) {
-                                aFeeds.put(fid, fa);
-                            }
-                        }
-                        break;
-                    case FEED_END:
-                        terminatedFeeds.add(fid);
-                        break;
-                    default: //ignore    
-                }
-            }
-            for (FeedActivity f : aFeeds.values()) {
-                System.out.println("ACTIVE FEEDS " + f.getFeedName());
-                activeFeeds.add(f);
-            }
-            return activeFeeds;
-        } catch (Exception e) {
-            throw new MetadataException(e);
-        }
-    }
+ 
 
     @Override
     public void addFeed(JobId jobId, Feed feed) throws MetadataException, RemoteException {
@@ -1543,46 +1422,37 @@ public class MetadataNode implements IMetadataNode {
 
     }
 
-    public List<FeedActivity> getDatasetsServedByFeed(JobId jobId, String dataverse, String feedName)
-            throws MetadataException, RemoteException {
-        List<FeedActivity> feedActivities = new ArrayList<FeedActivity>();
+  
+    @Override
+    public void dropFeedPolicy(JobId jobId, String dataverseName, String policyName) throws MetadataException,
+            RemoteException {
         try {
-            ITupleReference searchKey = createTuple(dataverse, feedName);
-            FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(false);
-            List<FeedActivity> results = new ArrayList<FeedActivity>();
-            IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
-                    tupleReaderWriter);
-            searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
-
-            if (!results.isEmpty()) {
-                Collections.sort(results); // most recent feed activity
-                Set<String> terminatedDatasets = new HashSet<String>();
-                Set<String> activeDatasets = new HashSet<String>();
-
-                for (FeedActivity result : results) {
-                    switch (result.getFeedActivityType()) {
-                        case FEED_BEGIN:
-                            if (!terminatedDatasets.contains(result.getDatasetName())) {
-                                feedActivities.add(result);
-                                activeDatasets.add(result.getDatasetName());
-                            }
-                            break;
-                        case FEED_END:
-                            if (!activeDatasets.contains(result.getDatasetName())) {
-                                terminatedDatasets.add(result.getDatasetName());
-                            }
-                            break;
-                    }
-
-                }
-            }
-            return feedActivities;
+            ITupleReference searchKey = createTuple(dataverseName, policyName);
+            ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, searchKey);
+            deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, tuple);
+        } catch (TreeIndexException e) {
+            throw new MetadataException("Unknown feed policy " + policyName, e);
         } catch (Exception e) {
             throw new MetadataException(e);
         }
     }
 
     @Override
+    public List<FeedPolicy> getDataversePolicies(JobId jobId, String dataverse) throws MetadataException,
+            RemoteException {
+        try {
+            ITupleReference searchKey = createTuple(dataverse);
+            FeedPolicyTupleTranslator tupleReaderWriter = new FeedPolicyTupleTranslator(false);
+            IValueExtractor<FeedPolicy> valueExtractor = new MetadataEntityValueExtractor<FeedPolicy>(tupleReaderWriter);
+            List<FeedPolicy> results = new ArrayList<FeedPolicy>();
+            searchIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, searchKey, valueExtractor, results);
+            return results;
+        } catch (Exception e) {
+            throw new MetadataException(e);
+        }
+    }
+    
+    @Override
     public void addExternalFile(JobId jobId, ExternalFile externalFile) throws MetadataException, RemoteException {
         try {
             // Insert into the 'externalFiles' dataset.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
index 826ebbf..24962b5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
@@ -26,11 +26,14 @@ import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.asterix.metadata.entities.Feed.FeedType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.Library;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
 import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
 
 /**
@@ -228,8 +231,16 @@ public class MetadataTransactionContext extends MetadataCache {
 
     }
 
-    public void dropFeed(String dataverse, String feedName) {
-        Feed feed = new Feed(dataverse, feedName, null, null, null);
+    public void dropFeed(String dataverseName, String feedName, FeedType feedType) {
+        Feed feed = null;
+        switch (feedType) {
+            case PRIMARY:
+                feed = new PrimaryFeed(dataverseName, feedName, null, null, null);
+                break;
+            case SECONDARY:
+                feed = new SecondaryFeed(dataverseName, feedName, null, null);
+                break;
+        }
         droppedCache.addFeedIfNotExists(feed);
         logAndApply(new MetadataLogicalOperation(feed, false));
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
deleted file mode 100644
index 049a45c..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package edu.uci.ics.asterix.metadata.api;
-
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.Set;
-
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
-
-public interface IClusterEventsSubscriber {
-
-    /**
-     * @param deadNodeIds
-     * @return
-     */
-    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds);
-
-    /**
-     * @param joinedNodeId
-     * @return
-     */
-    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId);
-
-    /**
-     * @param response
-     */
-    public void notifyRequestCompletion(IClusterManagementWorkResponse response);
-
-    /**
-     * @param previousState
-     * @param newState
-     */
-    public void notifyStateChange(State previousState, State newState);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java
deleted file mode 100644
index 65ac354..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.api;
-
-public interface IClusterManagementWork {
-
-    public enum WorkType {
-        ADD_NODE,
-        REMOVE_NODE
-    }
-
-    public WorkType getClusterManagementWorkType();
-
-    public int getWorkId();
-
-    public IClusterEventsSubscriber getSourceSubscriber();
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
index ea07a62..0351468 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
@@ -2,6 +2,7 @@ package edu.uci.ics.asterix.metadata.api;
 
 import java.util.Set;
 
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.event.schema.cluster.Node;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 0b3bae6..6c098fb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -19,7 +19,6 @@ import java.rmi.RemoteException;
 import java.util.List;
 
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -30,8 +29,6 @@ import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
@@ -489,26 +486,8 @@ public interface IMetadataManager {
      */
     public void dropFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException;
 
-    /**
-     * @param ctx
-     * @param feedId
-     * @param feedActivity
-     * @throws MetadataException
-     */
-    public void registerFeedActivity(MetadataTransactionContext ctx, FeedConnectionId feedId, FeedActivity feedActivity)
-            throws MetadataException;
-
-    /**
-     * @param ctx
-     * @param dataverseName
-     * @param datasetName
-     * @return
-     * @throws MetadataException
-     */
-    public FeedActivity getRecentActivityOnFeedConnection(MetadataTransactionContext ctx, FeedConnectionId feedId,
-            FeedActivityType... activityTypeFilter) throws MetadataException;
-
-    /**
+   
+   /**
      * @param ctx
      * @param policy
      * @throws MetadataException
@@ -525,19 +504,7 @@ public interface IMetadataManager {
     public FeedPolicy getFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName)
             throws MetadataException;
 
-    /**
-     * @param ctx
-     * @param dataverse
-     * @param dataset
-     * @return
-     * @throws MetadataException
-     */
-    public List<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx, String dataverse, String dataset)
-            throws MetadataException;
-
-    public List<FeedActivity> getConnectFeedActivitiesForFeed(MetadataTransactionContext ctx, String dataverse,
-            String dataset) throws MetadataException;
-
+   
     public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
 
     public int getMostRecentDatasetId() throws MetadataException;