You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:00 UTC

[06/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
index 9dcaef4..f16e24b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.external.util;
 
+import java.util.Arrays;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
 public class ExternalDataExceptionUtils {
     public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
     public static final String MISSING_PARAMETER = "Missing parameter.\n";
@@ -29,4 +33,22 @@ public class ExternalDataExceptionUtils {
         return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + ExternalDataConstants.LF + EXPECTED_VALUE
                 + expectedValue + ExternalDataConstants.LF + PASSED_VALUE + passedValue;
     }
+
+    public static String concat(String... vals) {
+        return Arrays.toString(vals);
+    }
+
+    // For now, we are accepting all exceptions as resolvable by adapter.
+    public static boolean isResolvable(Exception e) {
+        return true;
+    }
+
+    public static HyracksDataException suppress(HyracksDataException hde, Throwable th) {
+        if (hde == null) {
+            return new HyracksDataException(th);
+        } else {
+            hde.addSuppressed(th);
+            return hde;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 7c1c1b5..c9be872 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedConstants;
 import org.apache.asterix.external.api.IDataParserFactory;
 import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
@@ -134,6 +133,15 @@ public class ExternalDataUtils {
         return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT);
     }
 
+    public static void setRecordFormat(Map<String, String> configuration, String format) {
+        if (!configuration.containsKey(ExternalDataConstants.KEY_DATA_PARSER)) {
+            configuration.put(ExternalDataConstants.KEY_DATA_PARSER, format);
+        }
+        if (!configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
+            configuration.put(ExternalDataConstants.KEY_FORMAT, format);
+        }
+    }
+
     private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
 
     private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
@@ -219,4 +227,31 @@ public class ExternalDataUtils {
                         .substring(parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1))
                 .newInstance();
     }
+
+    public static boolean isFeed(Map<String, String> configuration) {
+        if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
+            return false;
+        } else {
+            return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_FEED));
+        }
+    }
+
+    public static void prepareFeed(Map<String, String> configuration, String dataverseName, String feedName) {
+        if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
+            configuration.put(ExternalDataConstants.KEY_IS_FEED, ExternalDataConstants.TRUE);
+        }
+        configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataverseName);
+        configuration.put(ExternalDataConstants.KEY_FEED_NAME, feedName);
+    }
+
+    public static boolean keepDataSourceOpen(Map<String, String> configuration) {
+        if (!configuration.containsKey(ExternalDataConstants.KEY_WAIT_FOR_DATA)) {
+            return true;
+        }
+        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_WAIT_FOR_DATA));
+    }
+
+    public static String getFeedName(Map<String, String> configuration) {
+        return configuration.get(ExternalDataConstants.KEY_FEED_NAME);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
new file mode 100644
index 0000000..cc21360
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+public class FeedConstants {
+
+    public final static String FEEDS_METADATA_DV = "feeds_metadata";
+    public final static String FAILED_TUPLE_DATASET = "failed_tuple";
+    public final static String FAILED_TUPLE_DATASET_TYPE = "FailedTupleType";
+    public final static String FAILED_TUPLE_DATASET_KEY = "id";
+
+    public static final class StatisticsConstants {
+        public static final String INTAKE_TUPLEID = "intake-tupleid";
+        public static final String INTAKE_PARTITION = "intake-partition";
+        public static final String INTAKE_TIMESTAMP = "intake-timestamp";
+        public static final String COMPUTE_TIMESTAMP = "compute-timestamp";
+        public static final String STORE_TIMESTAMP = "store-timestamp";
+
+    }
+
+    public static final class MessageConstants {
+        public static final String MESSAGE_TYPE = "message-type";
+        public static final String NODE_ID = "nodeId";
+        public static final String DATAVERSE = "dataverse";
+        public static final String FEED = "feed";
+        public static final String DATASET = "dataset";
+        public static final String AQL = "aql";
+        public static final String RUNTIME_TYPE = "runtime-type";
+        public static final String PARTITION = "partition";
+        public static final String INTAKE_PARTITION = "intake-partition";
+        public static final String INFLOW_RATE = "inflow-rate";
+        public static final String OUTFLOW_RATE = "outflow-rate";
+        public static final String MODE = "mode";
+        public static final String CURRENT_CARDINALITY = "current-cardinality";
+        public static final String REDUCED_CARDINALITY = "reduced-cardinality";
+        public static final String VALUE_TYPE = "value-type";
+        public static final String VALUE = "value";
+        public static final String CPU_LOAD = "cpu-load";
+        public static final String N_RUNTIMES = "n_runtimes";
+        public static final String HEAP_USAGE = "heap_usage";
+        public static final String OPERAND_ID = "operand-id";
+        public static final String COMPUTE_PARTITION_RETAIN_LIMIT = "compute-partition-retain-limit";
+        public static final String LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP = "last-persisted-tuple-intake_timestamp";
+        public static final String PERSISTENCE_DELAY_WITHIN_LIMIT = "persistence-delay-within-limit";
+        public static final String AVERAGE_PERSISTENCE_DELAY = "average-persistence-delay";
+        public static final String COMMIT_ACKS = "commit-acks";
+        public static final String MAX_WINDOW_ACKED = "max-window-acked";
+        public static final String BASE = "base";
+        public static final String NOT_APPLICABLE = "N/A";
+
+    }
+
+    public static final class NamingConstants {
+        public static final String LIBRARY_NAME_SEPARATOR = "#";
+    }
+
+    public static class JobConstants {
+        public static final int DEFAULT_FRAME_SIZE = 8192;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedFrameUtil.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedFrameUtil.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedFrameUtil.java
new file mode 100644
index 0000000..a2bdd64
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedFrameUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Random;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class FeedFrameUtil {
+    public static ByteBuffer removeBadTuple(IHyracksTaskContext ctx, int tupleIndex, FrameTupleAccessor fta)
+            throws HyracksDataException {
+        FrameTupleAppender appender = new FrameTupleAppender();
+        IFrame slicedFrame = new VSizeFrame(ctx);
+        appender.reset(slicedFrame, true);
+        int totalTuples = fta.getTupleCount();
+        for (int ti = 0; ti < totalTuples; ti++) {
+            if (ti != tupleIndex) {
+                appender.append(fta, ti);
+            }
+        }
+        return slicedFrame.getBuffer();
+    }
+
+    public static ByteBuffer getSampledFrame(IHyracksTaskContext ctx, FrameTupleAccessor fta, int sampleSize)
+            throws HyracksDataException {
+        NChooseKIterator it = new NChooseKIterator(fta.getTupleCount(), sampleSize);
+        FrameTupleAppender appender = new FrameTupleAppender();
+        IFrame sampledFrame = new VSizeFrame(ctx);
+        appender.reset(sampledFrame, true);
+        int nextTupleIndex = 0;
+        while (it.hasNext()) {
+            nextTupleIndex = it.next();
+            appender.append(fta, nextTupleIndex);
+        }
+        return sampledFrame.getBuffer();
+    }
+
+    private static class NChooseKIterator {
+
+        private final int n;
+        private final int k;
+        private final BitSet bSet;
+        private final Random random;
+
+        private int traversed = 0;
+
+        public NChooseKIterator(int n, int k) {
+            this.n = n;
+            this.k = k;
+            this.bSet = new BitSet(n);
+            bSet.set(0, n - 1);
+            this.random = new Random();
+        }
+
+        public boolean hasNext() {
+            return traversed < k;
+        }
+
+        public int next() {
+            if (hasNext()) {
+                traversed++;
+                int startOffset = random.nextInt(n);
+                int pos = -1;
+                while (pos < 0) {
+                    pos = bSet.nextSetBit(startOffset);
+                    if (pos < 0) {
+                        startOffset = 0;
+                    }
+                }
+                bSet.clear(pos);
+                return pos;
+            } else {
+                return -1;
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
new file mode 100644
index 0000000..72b438d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.TreeSet;
+
+import org.apache.commons.io.FileUtils;
+
+public class FeedLogManager {
+
+    public enum LogEntryType {
+        START,      // partition start
+        END,        // partition end
+        COMMIT,     // a record commit within a partition
+        SNAPSHOT    // an identifier that partitions with identifiers before this one should be ignored
+    }
+
+    public static final String PROGRESS_LOG_FILE_NAME = "progress.log";
+    public static final String ERROR_LOG_FILE_NAME = "error.log";
+    public static final String BAD_RECORDS_FILE_NAME = "failed_record.log";
+    public static final String START_PREFIX = "s:";
+    public static final String END_PREFIX = "e:";
+    public static final int PREFIX_SIZE = 2;
+    private String currentPartition;
+    private TreeSet<String> completed;
+    private Path dir;
+    private BufferedWriter progressLogger;
+    private BufferedWriter errorLogger;
+    private BufferedWriter recordLogger;
+
+    public FeedLogManager(File file) {
+        this.dir = file.toPath();
+        this.completed = new TreeSet<String>();
+    }
+
+    public void endPartition() throws IOException {
+        logProgress(END_PREFIX + currentPartition);
+        completed.add(currentPartition);
+    }
+
+    public void endPartition(String partition) throws IOException {
+        currentPartition = partition;
+        logProgress(END_PREFIX + currentPartition);
+        completed.add(currentPartition);
+    }
+
+    public void startPartition(String partition) throws IOException {
+        currentPartition = partition;
+        logProgress(START_PREFIX + currentPartition);
+    }
+
+    public boolean exists() {
+        return Files.exists(dir);
+    }
+
+    public void open() throws IOException {
+        // read content of logs.
+        BufferedReader reader = Files.newBufferedReader(
+                Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME));
+        String log = reader.readLine();
+        while (log != null) {
+            if (log.startsWith(END_PREFIX)) {
+                completed.add(getSplitId(log));
+            }
+            log = reader.readLine();
+        }
+        reader.close();
+
+        progressLogger = Files.newBufferedWriter(
+                Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME),
+                StandardCharsets.UTF_8, StandardOpenOption.APPEND);
+        errorLogger = Files.newBufferedWriter(
+                Paths.get(dir.toAbsolutePath().toString() + File.separator + ERROR_LOG_FILE_NAME),
+                StandardCharsets.UTF_8, StandardOpenOption.APPEND);
+        recordLogger = Files.newBufferedWriter(
+                Paths.get(dir.toAbsolutePath().toString() + File.separator + BAD_RECORDS_FILE_NAME),
+                StandardCharsets.UTF_8, StandardOpenOption.APPEND);
+    }
+
+    public void close() throws IOException {
+        progressLogger.close();
+        errorLogger.close();
+        recordLogger.close();
+    }
+
+    public boolean create() throws IOException {
+        File f = dir.toFile();
+        f.mkdirs();
+        new File(f, PROGRESS_LOG_FILE_NAME).createNewFile();
+        new File(f, ERROR_LOG_FILE_NAME).createNewFile();
+        new File(f, BAD_RECORDS_FILE_NAME).createNewFile();
+        return true;
+    }
+
+    public boolean destroy() throws IOException {
+        File f = dir.toFile();
+        FileUtils.deleteDirectory(f);
+        return true;
+    }
+
+    public void logProgress(String log) throws IOException {
+        progressLogger.write(log);
+        progressLogger.newLine();
+    }
+
+    public void logError(String error, Throwable th) throws IOException {
+        errorLogger.append(error);
+        errorLogger.newLine();
+        errorLogger.append(th.toString());
+        errorLogger.newLine();
+    }
+
+    public void logRecord(String record, Exception e) throws IOException {
+        recordLogger.append(record);
+        recordLogger.newLine();
+        recordLogger.append(e.toString());
+        recordLogger.newLine();
+    }
+
+    public static String getSplitId(String log) {
+        return log.substring(PREFIX_SIZE);
+    }
+
+    public boolean isSplitRead(String split) {
+        return completed.contains(split);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
new file mode 100644
index 0000000..224ee31
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class FeedUtils {
+    private static String prepareDataverseFeedName(String dataverseName, String feedName) {
+        return dataverseName + File.separator + feedName;
+    }
+
+    public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
+            AlgebricksPartitionConstraint partitionConstraints) throws Exception {
+        File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
+        if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
+            throw new AlgebricksException("Can't create file splits for adapter with count partitioning constraints");
+        }
+        String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
+        List<FileSplit> splits = new ArrayList<FileSplit>();
+        String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+        int i = 0;
+        for (String nd : locations) {
+            // Always get the first partition
+            ClusterPartition nodePartition = AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0];
+            String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
+                    nodePartition.getPartitionId());
+            // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
+            File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator
+                    + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + i);
+            splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f));
+            i++;
+        }
+        return splits.toArray(new FileSplit[] {});
+    }
+
+    public static FileReference getAbsoluteFileRef(String relativePath, int ioDeviceId, IIOManager ioManager) {
+        return ioManager.getAbsoluteFileRef(ioDeviceId, relativePath);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
new file mode 100644
index 0000000..4bb9d92
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchEvent.Kind;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class FileSystemWatcher {
+
+    private static Logger LOGGER = Logger.getLogger(FileSystemWatcher.class.getName());
+    private final WatchService watcher;
+    private final HashMap<WatchKey, Path> keys;
+    private final LinkedList<File> files = new LinkedList<File>();
+    private Iterator<File> it;
+    private final String expression;
+    private final FeedLogManager logManager;
+    private final Path path;
+    private final boolean isFeed;
+    private boolean done;
+    private File current;
+
+    public FileSystemWatcher(FeedLogManager logManager, Path inputResource, String expression, boolean isFeed)
+            throws IOException {
+        this.watcher = isFeed ? FileSystems.getDefault().newWatchService() : null;
+        this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
+        this.logManager = logManager;
+        this.expression = expression;
+        this.path = inputResource;
+        this.isFeed = isFeed;
+    }
+
+    public void init() throws IOException {
+        LinkedList<Path> dirs = null;
+        dirs = new LinkedList<Path>();
+        LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
+        it = files.iterator();
+        if (isFeed) {
+            for (Path path : dirs) {
+                register(path);
+            }
+            resume();
+        }
+    }
+
+    /**
+     * Register the given directory, and all its sub-directories, with the
+     * WatchService.
+     */
+    private void register(Path dir) throws IOException {
+        WatchKey key = dir.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
+                StandardWatchEventKinds.ENTRY_MODIFY);
+        keys.put(key, dir);
+    }
+
+    private void resume() throws IOException {
+        if (logManager == null) {
+            return;
+        }
+        if (logManager.exists()) {
+            logManager.open();
+        } else {
+            logManager.create();
+            logManager.open();
+            return;
+        }
+        /*
+         * Done processing the progress log file. We now have:
+         * the files that were completed.
+         */
+
+        if (it == null) {
+            return;
+        }
+        while (it.hasNext()) {
+            File file = it.next();
+            if (logManager.isSplitRead(file.getAbsolutePath())) {
+                // File was read completely, remove it from the files list
+                it.remove();
+            }
+        }
+        // reset the iterator
+        it = files.iterator();
+    }
+
+    @SuppressWarnings("unchecked")
+    static <T> WatchEvent<T> cast(WatchEvent<?> event) {
+        return (WatchEvent<T>) event;
+    }
+
+    private void handleEvents(WatchKey key) {
+        // get dir associated with the key
+        Path dir = keys.get(key);
+        if (dir == null) {
+            // This should never happen
+            if (LOGGER.isEnabledFor(Level.WARN)) {
+                LOGGER.warn("WatchKey not recognized!!");
+            }
+            return;
+        }
+        for (WatchEvent<?> event : key.pollEvents()) {
+            Kind<?> kind = event.kind();
+            // TODO: Do something about overflow events
+            // An overflow event means that some events were dropped
+            if (kind == StandardWatchEventKinds.OVERFLOW) {
+                if (LOGGER.isEnabledFor(Level.WARN)) {
+                    LOGGER.warn("Overflow event. Some events might have been missed");
+                }
+                continue;
+            }
+
+            // Context for directory entry event is the file name of entry
+            WatchEvent<Path> ev = cast(event);
+            Path name = ev.context();
+            Path child = dir.resolve(name);
+            // if directory is created then register it and its sub-directories
+            if ((kind == StandardWatchEventKinds.ENTRY_CREATE)) {
+                try {
+                    if (Files.isDirectory(child, LinkOption.NOFOLLOW_LINKS)) {
+                        register(child);
+                    } else {
+                        // it is a file, add it to the files list.
+                        LocalFileSystemUtils.validateAndAdd(child, expression, files);
+                    }
+                } catch (IOException e) {
+                    if (LOGGER.isEnabledFor(Level.ERROR)) {
+                        LOGGER.error(e);
+                    }
+                }
+            }
+        }
+    }
+
+    public void close() throws IOException {
+        if (!done) {
+            if (watcher != null) {
+                watcher.close();
+            }
+            if (logManager != null) {
+                if (current != null) {
+                    logManager.startPartition(current.getAbsolutePath());
+                    logManager.endPartition();
+                }
+                logManager.close();
+                current = null;
+            }
+            done = true;
+        }
+    }
+
+    public File next() throws IOException {
+        if (current != null && logManager != null) {
+            logManager.startPartition(current.getAbsolutePath());
+            logManager.endPartition();
+        }
+        current = it.next();
+        return current;
+    }
+
+    private boolean endOfEvents(WatchKey key) {
+        // reset key and remove from set if directory no longer accessible
+        if (!key.reset()) {
+            keys.remove(key);
+            if (keys.isEmpty()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean hasNext() {
+        if (it.hasNext()) {
+            return true;
+        }
+        if (done || !isFeed) {
+            return false;
+        }
+        files.clear();
+        // Read new Events (Polling first to add all available files)
+        WatchKey key;
+        key = watcher.poll();
+        while (key != null) {
+            handleEvents(key);
+            if (endOfEvents(key)) {
+                return false;
+            }
+            key = watcher.poll();
+        }
+        // No file was found, wait for the filesystem to push events
+        while (files.isEmpty()) {
+            try {
+                key = watcher.take();
+            } catch (InterruptedException x) {
+                if (LOGGER.isEnabledFor(Level.WARN)) {
+                    LOGGER.warn("Feed Closed");
+                }
+                return false;
+            } catch (ClosedWatchServiceException e) {
+                if (LOGGER.isEnabledFor(Level.WARN)) {
+                    LOGGER.warn("The watcher has exited");
+                }
+                return false;
+            }
+            handleEvents(key);
+            if (endOfEvents(key)) {
+                return false;
+            }
+        }
+        // files were found, re-create the iterator and move it one step
+        it = files.iterator();
+        return it.hasNext();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
new file mode 100644
index 0000000..d6e9463
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.LinkedList;
+import java.util.regex.Pattern;
+
+public class LocalFileSystemUtils {
+
+    //TODO: replace this method by FileUtils.iterateFilesAndDirs(.)
+    public static void traverse(final LinkedList<File> files, File root, final String expression,
+            final LinkedList<Path> dirs) throws IOException {
+        if (!Files.exists(root.toPath())) {
+            return;
+        }
+        if (!Files.isDirectory(root.toPath())) {
+            validateAndAdd(root.toPath(), expression, files);
+        }
+        //FileUtils.iterateFilesAndDirs(directory, fileFilter, dirFilter)
+        Files.walkFileTree(root.toPath(), new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs) throws IOException {
+                if (!Files.exists(path, LinkOption.NOFOLLOW_LINKS)) {
+                    return FileVisitResult.TERMINATE;
+                }
+                if (Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
+                    if (dirs != null) {
+                        dirs.add(path);
+                    }
+                    //get immediate children files
+                    File[] content = path.toFile().listFiles();
+                    for (File file : content) {
+                        if (!file.isDirectory()) {
+                            validateAndAdd(file.toPath(), expression, files);
+                        }
+                    }
+                } else {
+                    // Path is a file, add to list of files if it matches the expression
+                    validateAndAdd(path, expression, files);
+                }
+                return FileVisitResult.CONTINUE;
+            }
+        });
+    }
+
+    public static void validateAndAdd(Path path, String expression, LinkedList<File> files) {
+        if (expression == null || Pattern.matches(expression, path.toString())) {
+            files.add(new File(path.toString()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java
new file mode 100644
index 0000000..b62dda8
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.external.api.INodeResolverFactory;
+
+/**
+ * Factory for creating instance of {@link NodeResolver}
+ */
+public class NodeResolverFactory implements INodeResolverFactory {
+
+    private static final INodeResolver INSTANCE = new NodeResolver();
+
+    @Override
+    public INodeResolver createNodeResolver() {
+        return INSTANCE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetGenerator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetGenerator.java
new file mode 100644
index 0000000..ec866be
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetGenerator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.util.DataGenerator.InitializationInfo;
+import org.apache.asterix.external.util.DataGenerator.TweetMessage;
+import org.apache.asterix.external.util.DataGenerator.TweetMessageIterator;
+
+public class TweetGenerator {
+
+    private static Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
+
+    public static final String KEY_DURATION = "duration";
+    public static final String KEY_TPS = "tps";
+    public static final String KEY_VERBOSE = "verbose";
+    public static final String KEY_FIELDS = "fields";
+    public static final int INFINITY = 0;
+
+    private static final int DEFAULT_DURATION = INFINITY;
+
+    private int duration;
+    private TweetMessageIterator tweetIterator = null;
+    private int partition;
+    private long tweetCount = 0;
+    private int frameTweetCount = 0;
+    private int numFlushedTweets = 0;
+    private DataGenerator dataGenerator = null;
+    private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
+    private String[] fields;
+    private final List<OutputStream> subscribers;
+    private final Object lock = new Object();
+    private final List<OutputStream> subscribersForRemoval = new ArrayList<OutputStream>();
+
+    public TweetGenerator(Map<String, String> configuration, int partition) throws Exception {
+        this.partition = partition;
+        String value = configuration.get(KEY_DURATION);
+        this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
+        dataGenerator = new DataGenerator(new InitializationInfo());
+        tweetIterator = dataGenerator.new TweetMessageIterator(duration);
+        this.fields = configuration.get(KEY_FIELDS) != null ? configuration.get(KEY_FIELDS).split(",") : null;
+        this.subscribers = new ArrayList<OutputStream>();
+    }
+
+    private void writeTweetString(TweetMessage tweetMessage) throws IOException {
+        String tweet = tweetMessage.getAdmEquivalent(fields) + "\n";
+        System.out.println(tweet);
+        tweetCount++;
+        byte[] b = tweet.getBytes();
+        if (outputBuffer.position() + b.length > outputBuffer.limit()) {
+            flush();
+            numFlushedTweets += frameTweetCount;
+            frameTweetCount = 0;
+            outputBuffer.put(b);
+        } else {
+            outputBuffer.put(b);
+        }
+        frameTweetCount++;
+    }
+
+    private void flush() throws IOException {
+        outputBuffer.flip();
+        synchronized (lock) {
+            for (OutputStream os : subscribers) {
+                try {
+                    os.write(outputBuffer.array(), 0, outputBuffer.limit());
+                } catch (Exception e) {
+                    subscribersForRemoval.add(os);
+                }
+            }
+            if (!subscribersForRemoval.isEmpty()) {
+                subscribers.removeAll(subscribersForRemoval);
+                subscribersForRemoval.clear();
+            }
+        }
+        outputBuffer.position(0);
+        outputBuffer.limit(32 * 1024);
+    }
+
+    public boolean generateNextBatch(int numTweets) throws Exception {
+        boolean moreData = tweetIterator.hasNext();
+        if (!moreData) {
+            if (outputBuffer.position() > 0) {
+                flush();
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount);
+            }
+            return false;
+        } else {
+            int count = 0;
+            while (count < numTweets) {
+                writeTweetString(tweetIterator.next());
+                count++;
+            }
+            return true;
+        }
+    }
+
+    public int getNumFlushedTweets() {
+        return numFlushedTweets;
+    }
+
+    public void registerSubscriber(OutputStream os) {
+        synchronized (lock) {
+            subscribers.add(os);
+        }
+    }
+
+    public void deregisterSubscribers(OutputStream os) {
+        synchronized (lock) {
+            subscribers.remove(os);
+        }
+    }
+
+    public void close() throws IOException {
+        synchronized (lock) {
+            for (OutputStream os : subscribers) {
+                os.close();
+            }
+        }
+    }
+
+    public boolean isSubscribed() {
+        return !subscribers.isEmpty();
+    }
+
+    public long getTweetCount() {
+        return tweetCount;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
deleted file mode 100644
index f8914a6..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util;
-
-import org.apache.asterix.external.library.java.JObjectUtil;
-import org.apache.asterix.external.util.Datatypes.Tweet;
-import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableRecord;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import twitter4j.Status;
-import twitter4j.User;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class TweetProcessor {
-
-    private IAObject[] mutableTweetFields;
-    private IAObject[] mutableUserFields;
-    private AMutableRecord mutableRecord;
-    private AMutableRecord mutableUser;
-
-    private final Map<String, Integer> userFieldNameMap = new HashMap<>();
-    private final Map<String, Integer> tweetFieldNameMap = new HashMap<>();
-
-
-    public TweetProcessor(ARecordType recordType) {
-        initFieldNames(recordType);
-        mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0),
-                new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) };
-        mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)], mutableUserFields);
-
-        mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0),
-                new AMutableDouble(0), new AMutableString(null), new AMutableString(null) };
-        mutableRecord = new AMutableRecord(recordType, mutableTweetFields);
-
-    }
-
-    // Initialize the hashmap values for the field names and positions
-    private void initFieldNames(ARecordType recordType) {
-        String tweetFields[] = recordType.getFieldNames();
-        for (int i=0; i<tweetFields.length; i++) {
-            tweetFieldNameMap.put(tweetFields[i], i);
-            if (tweetFields[i].equals(Tweet.USER)) {
-                IAType fieldType = recordType.getFieldTypes()[i];
-                if (fieldType.getTypeTag() == ATypeTag.RECORD) {
-                    String userFields[]  = ((ARecordType)fieldType).getFieldNames();
-                    for (int j=0; j<userFields.length; j++) {
-                        userFieldNameMap.put(userFields[j], j);
-                    }
-                }
-
-            }
-        }
-    }
-
-
-    public AMutableRecord processNextTweet(Status tweet) {
-        User user = tweet.getUser();
-
-        // Tweet user data
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)]).setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)]).setValue(JObjectUtil.getNormalizedString(user.getLang()));
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount());
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount());
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)]).setValue(JObjectUtil.getNormalizedString(user.getName()));
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)]).setValue(user.getFollowersCount());
-
-
-        // Tweet data
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId()));
-
-        int userPos = tweetFieldNameMap.get(Tweet.USER);
-        for (int i = 0; i < mutableUserFields.length; i++) {
-            ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i, mutableUserFields[i]);
-        }
-        if (tweet.getGeoLocation() != null) {
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(tweet.getGeoLocation().getLatitude());
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(tweet.getGeoLocation().getLongitude());
-        } else {
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0);
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0);
-        }
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)]).setValue(JObjectUtil.getNormalizedString(
-                tweet.getCreatedAt().toString()));
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)]).setValue(JObjectUtil.getNormalizedString(tweet.getText()));
-
-        for (int i = 0; i < mutableTweetFields.length; i++) {
-            mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
-        }
-
-        return mutableRecord;
-
-    }
-
-    public AMutableRecord getMutableRecord() {
-        return mutableRecord;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
index 70bd3e1..16f8b0a 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
@@ -18,14 +18,11 @@
  */
 package org.apache.asterix.external.library;
 
-import java.util.Random;
-
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
 import org.apache.asterix.external.library.java.JObjects.JInt;
 import org.apache.asterix.external.library.java.JObjects.JRecord;
 import org.apache.asterix.external.library.java.JObjects.JString;
-import org.apache.asterix.external.api.IExternalScalarFunction;
-import org.apache.asterix.external.api.IFunctionHelper;
-import org.apache.asterix.external.library.java.JTypeTag;
 
 /**
  * Accepts an input record of type Open{ id: int32, text: string }
@@ -35,11 +32,8 @@ import org.apache.asterix.external.library.java.JTypeTag;
  */
 public class UpperCaseFunction implements IExternalScalarFunction {
 
-    private Random random;
-
     @Override
     public void initialize(IFunctionHelper functionHelper) {
-        random = new Random();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index df0fb94..2fc289b 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -19,21 +19,24 @@
 package org.apache.asterix.external.library.adapter;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
-import org.apache.asterix.external.dataset.adapter.StreamBasedAdapter;
+import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.ITupleParser;
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public class TestTypedAdapter extends StreamBasedAdapter {
+public class TestTypedAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
 
@@ -45,25 +48,34 @@ public class TestTypedAdapter extends StreamBasedAdapter {
 
     private DummyGenerator generator;
 
+    protected final ITupleParser tupleParser;
+
+    protected final IAType sourceDatatype;
+
+    protected static final Logger LOGGER = Logger.getLogger(TestTypedAdapter.class.getName());
+
     public TestTypedAdapter(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx,
             Map<String, String> configuration, int partition) throws IOException {
-        super(parserFactory, sourceDatatype, ctx, partition);
         pos = new PipedOutputStream();
         pis = new PipedInputStream(pos);
         this.configuration = configuration;
+        this.tupleParser = parserFactory.createTupleParser(ctx);
+        this.sourceDatatype = sourceDatatype;
     }
 
     @Override
-    public InputStream getInputStream(int partition) throws IOException {
-        return pis;
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter frameWriter) throws Exception {
+    public void start(int partition, IFrameWriter writer) throws Exception {
         generator = new DummyGenerator(configuration, pos);
         ExecutorService executor = Executors.newSingleThreadExecutor();
         executor.execute(generator);
-        super.start(partition, frameWriter);
+        if (pis != null) {
+            tupleParser.parse(pis, writer);
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning(
+                        "Could not obtain input stream for parsing from adapter " + this + "[" + partition + "]");
+            }
+        }
     }
 
     private static class DummyGenerator implements Runnable {
@@ -135,4 +147,13 @@ public class TestTypedAdapter extends StreamBasedAdapter {
         return false;
     }
 
+    @Override
+    public boolean pause() {
+        return false;
+    }
+
+    @Override
+    public boolean resume() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 6b08f3a..5346bf2 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -22,9 +22,9 @@ import java.io.InputStream;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
 import org.apache.asterix.common.parse.ITupleForwarder;
 import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.om.types.ARecordType;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index 71c762a..3f85ba9 100644
--- a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -24,12 +24,13 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedActivity;
-import org.apache.asterix.common.feeds.FeedConnectionRequest;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.feed.management.FeedConnectionRequest;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.watch.FeedActivity;
 import org.apache.asterix.lang.aql.parser.AQLParserFactory;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.IParserFactory;
@@ -41,12 +42,9 @@ import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.Function;
-import org.apache.asterix.metadata.entities.PrimaryFeed;
-import org.apache.asterix.metadata.entities.SecondaryFeed;
-import org.apache.asterix.metadata.feeds.FeedUtil;
+import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -188,14 +186,13 @@ public class SubscribeFeedStatement implements Statement {
         try {
             switch (feed.getFeedType()) {
                 case PRIMARY:
-                    Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+                    Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> factoryOutput = null;
 
-                    factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) feed, policyAccessor,
-                            mdTxnCtx);
+                    factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(feed, policyAccessor, mdTxnCtx);
                     outputType = factoryOutput.second.getTypeName();
                     break;
                 case SECONDARY:
-                    outputType = FeedUtil.getSecondaryFeedOutput((SecondaryFeed) feed, policyAccessor, mdTxnCtx);
+                    outputType = FeedMetadataUtil.getSecondaryFeedOutput(feed, policyAccessor, mdTxnCtx);
                     break;
             }
             return outputType;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-aql/src/main/javacc/AQL.jj
----------------------------------------------------------------------
diff --git a/asterix-lang-aql/src/main/javacc/AQL.jj b/asterix-lang-aql/src/main/javacc/AQL.jj
index 12e7897..8f62f74 100644
--- a/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -43,6 +43,7 @@ import org.apache.asterix.common.annotations.TypeDataGen;
 import org.apache.asterix.common.annotations.UndeclaredFieldsDataGen;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.aql.clause.DistinctClause;
@@ -127,7 +128,6 @@ import org.apache.asterix.lang.common.statement.WriteStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.QuantifiedPair;
 import org.apache.asterix.lang.common.struct.VarIdentifier;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
----------------------------------------------------------------------
diff --git a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
index 49aa74b..d9d09f7 100644
--- a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
+++ b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
@@ -20,12 +20,12 @@ package org.apache.asterix.lang.common.statement;
 
 import java.util.Map;
 
+import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
 
 public class DatasetDecl implements Statement {
     protected final Identifier name;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index 28e5af0..a1e6363 100644
--- a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -95,7 +96,6 @@ import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.OperatorType;
 import org.apache.asterix.lang.common.struct.QuantifiedPair;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/VariableCheckAndRewriteVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/VariableCheckAndRewriteVisitor.java b/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/VariableCheckAndRewriteVisitor.java
index b016b62..f9cf99f 100644
--- a/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/VariableCheckAndRewriteVisitor.java
+++ b/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/VariableCheckAndRewriteVisitor.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Expression;
@@ -69,7 +70,6 @@ import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
 import org.apache.asterix.lang.sqlpp.util.SqlppFormatPrintUtil;
 import org.apache.asterix.lang.sqlpp.visitor.base.AbstractSqlppQueryExpressionVisitor;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
 import org.apache.hyracks.algebricks.core.algebra.base.Counter;
 
 public class VariableCheckAndRewriteVisitor extends AbstractSqlppQueryExpressionVisitor<Expression, Void> {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
----------------------------------------------------------------------
diff --git a/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 0e3fad8..547a10b 100644
--- a/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -44,6 +44,7 @@ import org.apache.asterix.common.annotations.TypeDataGen;
 import org.apache.asterix.common.annotations.UndeclaredFieldsDataGen;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Expression;
@@ -140,7 +141,6 @@ import org.apache.asterix.lang.sqlpp.optype.JoinType;
 import org.apache.asterix.lang.sqlpp.optype.SetOpType;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index ea65b20..a3a81de 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -35,7 +35,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedPolicy;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
@@ -66,9 +66,9 @@ public class MetadataCache {
     protected final Map<FunctionSignature, Function> functions = new HashMap<FunctionSignature, Function>();
     // Key is adapter dataverse. Key of value map is the adapter name  
     protected final Map<String, Map<String, DatasourceAdapter>> adapters = new HashMap<String, Map<String, DatasourceAdapter>>();
-  
+
     // Key is DataverseName, Key of the value map is the Policy name   
-    protected final Map<String, Map<String, FeedPolicy>> feedPolicies = new HashMap<String, Map<String, FeedPolicy>>();
+    protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies = new HashMap<String, Map<String, FeedPolicyEntity>>();
     // Key is library dataverse. Key of value map is the library name
     protected final Map<String, Map<String, Library>> libraries = new HashMap<String, Map<String, Library>>();
     // Key is library dataverse. Key of value map is the feed name  
@@ -110,18 +110,17 @@ public class MetadataCache {
                         synchronized (datatypes) {
                             synchronized (functions) {
                                 synchronized (adapters) {
-                                        synchronized (libraries) {
-                                            synchronized (compactionPolicies) {
-                                                dataverses.clear();
-                                                nodeGroups.clear();
-                                                datasets.clear();
-                                                indexes.clear();
-                                                datatypes.clear();
-                                                functions.clear();
-                                                adapters.clear();
-                                                libraries.clear();
-                                                compactionPolicies.clear();
-                                            }
+                                    synchronized (libraries) {
+                                        synchronized (compactionPolicies) {
+                                            dataverses.clear();
+                                            nodeGroups.clear();
+                                            datasets.clear();
+                                            indexes.clear();
+                                            datatypes.clear();
+                                            functions.clear();
+                                            adapters.clear();
+                                            libraries.clear();
+                                            compactionPolicies.clear();
                                         }
                                     }
                                 }
@@ -131,9 +130,9 @@ public class MetadataCache {
                 }
             }
         }
-    
+    }
 
-    public Object addDataverseIfNotExists(Dataverse dataverse) {
+    public Dataverse addDataverseIfNotExists(Dataverse dataverse) {
         synchronized (dataverses) {
             synchronized (datasets) {
                 synchronized (datatypes) {
@@ -149,7 +148,7 @@ public class MetadataCache {
         }
     }
 
-    public Object addDatasetIfNotExists(Dataset dataset) {
+    public Dataset addDatasetIfNotExists(Dataset dataset) {
         synchronized (datasets) {
             synchronized (indexes) {
                 // Add the primary index associated with the dataset, if the dataset is an
@@ -175,13 +174,13 @@ public class MetadataCache {
         }
     }
 
-    public Object addIndexIfNotExists(Index index) {
+    public Index addIndexIfNotExists(Index index) {
         synchronized (indexes) {
             return addIndexIfNotExistsInternal(index);
         }
     }
 
-    public Object addDatatypeIfNotExists(Datatype datatype) {
+    public Datatype addDatatypeIfNotExists(Datatype datatype) {
         synchronized (datatypes) {
             Map<String, Datatype> m = datatypes.get(datatype.getDataverseName());
             if (m == null) {
@@ -195,7 +194,7 @@ public class MetadataCache {
         }
     }
 
-    public Object addNodeGroupIfNotExists(NodeGroup nodeGroup) {
+    public NodeGroup addNodeGroupIfNotExists(NodeGroup nodeGroup) {
         synchronized (nodeGroups) {
             if (!nodeGroups.containsKey(nodeGroup.getNodeGroupName())) {
                 return nodeGroups.put(nodeGroup.getNodeGroupName(), nodeGroup);
@@ -204,7 +203,7 @@ public class MetadataCache {
         }
     }
 
-    public Object addCompactionPolicyIfNotExists(CompactionPolicy compactionPolicy) {
+    public CompactionPolicy addCompactionPolicyIfNotExists(CompactionPolicy compactionPolicy) {
         synchronized (compactionPolicy) {
             Map<String, CompactionPolicy> p = compactionPolicies.get(compactionPolicy.getDataverseName());
             if (p == null) {
@@ -220,17 +219,17 @@ public class MetadataCache {
         }
     }
 
-    public Object dropCompactionPolicy(CompactionPolicy compactionPolicy) {
+    public CompactionPolicy dropCompactionPolicy(CompactionPolicy compactionPolicy) {
         synchronized (compactionPolicies) {
             Map<String, CompactionPolicy> p = compactionPolicies.get(compactionPolicy.getDataverseName());
             if (p != null && p.get(compactionPolicy.getPolicyName()) != null) {
-                return p.remove(compactionPolicy).getPolicyName();
+                return p.remove(compactionPolicy);
             }
             return null;
         }
     }
 
-    public Object dropDataverse(Dataverse dataverse) {
+    public Dataverse dropDataverse(Dataverse dataverse) {
         synchronized (dataverses) {
             synchronized (datasets) {
                 synchronized (indexes) {
@@ -238,26 +237,25 @@ public class MetadataCache {
                         synchronized (functions) {
                             synchronized (adapters) {
                                 synchronized (libraries) {
-                                        synchronized (feeds) {
-                                            synchronized (compactionPolicies) {
-                                                datasets.remove(dataverse.getDataverseName());
-                                                indexes.remove(dataverse.getDataverseName());
-                                                datatypes.remove(dataverse.getDataverseName());
-                                                adapters.remove(dataverse.getDataverseName());
-                                                compactionPolicies.remove(dataverse.getDataverseName());
-                                                List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<FunctionSignature>();
-                                                for (FunctionSignature signature : functions.keySet()) {
-                                                    if (signature.getNamespace().equals(dataverse.getDataverseName())) {
-                                                        markedFunctionsForRemoval.add(signature);
-                                                    }
-                                                }
-                                                for (FunctionSignature signature : markedFunctionsForRemoval) {
-                                                    functions.remove(signature);
+                                    synchronized (feeds) {
+                                        synchronized (compactionPolicies) {
+                                            datasets.remove(dataverse.getDataverseName());
+                                            indexes.remove(dataverse.getDataverseName());
+                                            datatypes.remove(dataverse.getDataverseName());
+                                            adapters.remove(dataverse.getDataverseName());
+                                            compactionPolicies.remove(dataverse.getDataverseName());
+                                            List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<FunctionSignature>();
+                                            for (FunctionSignature signature : functions.keySet()) {
+                                                if (signature.getNamespace().equals(dataverse.getDataverseName())) {
+                                                    markedFunctionsForRemoval.add(signature);
                                                 }
-                                                libraries.remove(dataverse.getDataverseName());
-                                                feeds.remove(dataverse.getDataverseName());
-                                                return dataverses.remove(dataverse.getDataverseName());
                                             }
+                                            for (FunctionSignature signature : markedFunctionsForRemoval) {
+                                                functions.remove(signature);
+                                            }
+                                            libraries.remove(dataverse.getDataverseName());
+                                            feeds.remove(dataverse.getDataverseName());
+                                            return dataverses.remove(dataverse.getDataverseName());
                                         }
                                     }
                                 }
@@ -267,9 +265,9 @@ public class MetadataCache {
                 }
             }
         }
-    
+    }
 
-    public Object dropDataset(Dataset dataset) {
+    public Dataset dropDataset(Dataset dataset) {
         synchronized (datasets) {
             synchronized (indexes) {
 
@@ -289,7 +287,7 @@ public class MetadataCache {
         }
     }
 
-    public Object dropIndex(Index index) {
+    public Index dropIndex(Index index) {
         synchronized (indexes) {
             Map<String, Map<String, Index>> datasetMap = indexes.get(index.getDataverseName());
             if (datasetMap == null) {
@@ -304,7 +302,7 @@ public class MetadataCache {
         }
     }
 
-    public Object dropDatatype(Datatype datatype) {
+    public Datatype dropDatatype(Datatype datatype) {
         synchronized (datatypes) {
             Map<String, Datatype> m = datatypes.get(datatype.getDataverseName());
             if (m == null) {
@@ -314,7 +312,7 @@ public class MetadataCache {
         }
     }
 
-    public Object dropNodeGroup(NodeGroup nodeGroup) {
+    public NodeGroup dropNodeGroup(NodeGroup nodeGroup) {
         synchronized (nodeGroups) {
             return nodeGroups.remove(nodeGroup.getNodeGroupName());
         }
@@ -405,11 +403,11 @@ public class MetadataCache {
      */
     protected class MetadataLogicalOperation {
         // Entity to be added/dropped.
-        public final IMetadataEntity entity;
+        public final IMetadataEntity<?> entity;
         // True for add, false for drop.
         public final boolean isAdd;
 
-        public MetadataLogicalOperation(IMetadataEntity entity, boolean isAdd) {
+        public MetadataLogicalOperation(IMetadataEntity<?> entity, boolean isAdd) {
             this.entity = entity;
             this.isAdd = isAdd;
         }
@@ -431,7 +429,7 @@ public class MetadataCache {
         }
     }
 
-    public Object addFunctionIfNotExists(Function function) {
+    public Function addFunctionIfNotExists(Function function) {
         synchronized (functions) {
             FunctionSignature signature = new FunctionSignature(function.getDataverseName(), function.getName(),
                     function.getArity());
@@ -443,7 +441,7 @@ public class MetadataCache {
         }
     }
 
-    public Object dropFunction(Function function) {
+    public Function dropFunction(Function function) {
         synchronized (functions) {
             FunctionSignature signature = new FunctionSignature(function.getDataverseName(), function.getName(),
                     function.getArity());
@@ -455,11 +453,11 @@ public class MetadataCache {
         }
     }
 
-    public Object addFeedPolicyIfNotExists(FeedPolicy feedPolicy) {
+    public Object addFeedPolicyIfNotExists(FeedPolicyEntity feedPolicy) {
         synchronized (feedPolicy) {
-            Map<String, FeedPolicy> p = feedPolicies.get(feedPolicy.getDataverseName());
+            Map<String, FeedPolicyEntity> p = feedPolicies.get(feedPolicy.getDataverseName());
             if (p == null) {
-                p = new HashMap<String, FeedPolicy>();
+                p = new HashMap<String, FeedPolicyEntity>();
                 p.put(feedPolicy.getPolicyName(), feedPolicy);
                 feedPolicies.put(feedPolicy.getDataverseName(), p);
             } else {
@@ -471,9 +469,9 @@ public class MetadataCache {
         }
     }
 
-    public Object dropFeedPolicy(FeedPolicy feedPolicy) {
+    public Object dropFeedPolicy(FeedPolicyEntity feedPolicy) {
         synchronized (feedPolicies) {
-            Map<String, FeedPolicy> p = feedPolicies.get(feedPolicy.getDataverseName());
+            Map<String, FeedPolicyEntity> p = feedPolicies.get(feedPolicy.getDataverseName());
             if (p != null && p.get(feedPolicy.getPolicyName()) != null) {
                 return p.remove(feedPolicy).getPolicyName();
             }
@@ -481,10 +479,10 @@ public class MetadataCache {
         }
     }
 
-    public Object addAdapterIfNotExists(DatasourceAdapter adapter) {
+    public DatasourceAdapter addAdapterIfNotExists(DatasourceAdapter adapter) {
         synchronized (adapters) {
-            Map<String, DatasourceAdapter> adaptersInDataverse = adapters.get(adapter.getAdapterIdentifier()
-                    .getNamespace());
+            Map<String, DatasourceAdapter> adaptersInDataverse = adapters
+                    .get(adapter.getAdapterIdentifier().getNamespace());
             if (adaptersInDataverse == null) {
                 adaptersInDataverse = new HashMap<String, DatasourceAdapter>();
                 adapters.put(adapter.getAdapterIdentifier().getNamespace(), adaptersInDataverse);
@@ -497,10 +495,10 @@ public class MetadataCache {
         }
     }
 
-    public Object dropAdapter(DatasourceAdapter adapter) {
+    public DatasourceAdapter dropAdapter(DatasourceAdapter adapter) {
         synchronized (adapters) {
-            Map<String, DatasourceAdapter> adaptersInDataverse = adapters.get(adapter.getAdapterIdentifier()
-                    .getNamespace());
+            Map<String, DatasourceAdapter> adaptersInDataverse = adapters
+                    .get(adapter.getAdapterIdentifier().getNamespace());
             if (adaptersInDataverse != null) {
                 return adaptersInDataverse.remove(adapter.getAdapterIdentifier().getName());
             }
@@ -508,10 +506,7 @@ public class MetadataCache {
         }
     }
 
-  
-
-
-    public Object addLibraryIfNotExists(Library library) {
+    public Library addLibraryIfNotExists(Library library) {
         synchronized (libraries) {
             Map<String, Library> libsInDataverse = libraries.get(library.getDataverseName());
             boolean needToAddd = (libsInDataverse == null || libsInDataverse.get(library.getName()) != null);
@@ -526,7 +521,7 @@ public class MetadataCache {
         }
     }
 
-    public Object dropLibrary(Library library) {
+    public Library dropLibrary(Library library) {
         synchronized (libraries) {
             Map<String, Library> librariesInDataverse = libraries.get(library.getDataverseName());
             if (librariesInDataverse != null) {
@@ -536,12 +531,11 @@ public class MetadataCache {
         }
     }
 
-    public Object addFeedIfNotExists(Feed feed) {
-        // TODO Auto-generated method stub
+    public Feed addFeedIfNotExists(Feed feed) {
         return null;
     }
 
-    public Object dropFeed(Feed feed) {
+    public Feed dropFeed(Feed feed) {
         synchronized (feeds) {
             Map<String, Feed> feedsInDataverse = feeds.get(feed.getDataverseName());
             if (feedsInDataverse != null) {
@@ -551,7 +545,7 @@ public class MetadataCache {
         }
     }
 
-    private Object addIndexIfNotExistsInternal(Index index) {
+    private Index addIndexIfNotExistsInternal(Index index) {
         Map<String, Map<String, Index>> datasetMap = indexes.get(index.getDataverseName());
         if (datasetMap == null) {
             datasetMap = new HashMap<String, Map<String, Index>>();