You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/02/22 23:35:15 UTC

[27/34] incubator-asterixdb git commit: Enabled Feed Tests and Added External Library tests

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-app/pom.xml b/asterix-app/pom.xml
index 9b87d9f..09a4c4c 100644
--- a/asterix-app/pom.xml
+++ b/asterix-app/pom.xml
@@ -24,15 +24,14 @@
         <version>0.8.8-SNAPSHOT</version>
     </parent>
     <artifactId>asterix-app</artifactId>
-
-	<licenses>
-		<license>
-			<name>Apache License, Version 2.0</name>
-			<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-			<distribution>repo</distribution>
-			<comments>A business-friendly OSS license</comments>
-		</license>
-	</licenses>
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+            <comments>A business-friendly OSS license</comments>
+        </license>
+    </licenses>
     <build>
         <plugins>
             <plugin>
@@ -95,9 +94,32 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+    <artifactId>maven-resources-plugin</artifactId>
+    <executions>
+        <execution>
+            <id>copy-external-library</id>
+            <phase>generate-resources</phase>
+            <goals>
+                <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+                <outputDirectory>src/test/resources/externallib</outputDirectory>
+                <overwrite>true</overwrite>
+                <resources>
+                    <resource>
+                        <directory>../asterix-external-data/target</directory>
+                        <includes>
+                            <include>testlib-zip-binary-assembly.zip</include>
+                        </includes>
+                    </resource>
+                </resources>
+            </configuration>
+        </execution>
+    </executions>
+</plugin>
         </plugins>
     </build>
-
     <dependencies>
         <dependency>
             <groupId>javax.servlet</groupId>
@@ -137,41 +159,41 @@
         <dependency>
             <groupId>org.apache.asterix</groupId>
             <artifactId>asterix-algebra</artifactId>
-            <version>0.8.8-SNAPSHOT</version>
+            <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.asterix</groupId>
             <artifactId>asterix-om</artifactId>
-            <version>0.8.8-SNAPSHOT</version>
+            <version>${project.version}</version>
             <type>jar</type>
             <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.asterix</groupId>
             <artifactId>asterix-metadata</artifactId>
-            <version>0.8.8-SNAPSHOT</version>
+            <version>${project.version}</version>
             <type>jar</type>
             <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.asterix</groupId>
             <artifactId>asterix-tools</artifactId>
-            <version>0.8.8-SNAPSHOT</version>
+            <version>${project.version}</version>
             <type>jar</type>
             <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.asterix</groupId>
             <artifactId>asterix-common</artifactId>
-            <version>0.8.8-SNAPSHOT</version>
+            <version>${project.version}</version>
             <type>jar</type>
             <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.asterix</groupId>
             <artifactId>asterix-common</artifactId>
-            <version>0.8.8-SNAPSHOT</version>
+            <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
@@ -181,7 +203,7 @@
         <dependency>
             <groupId>org.apache.asterix</groupId>
             <artifactId>asterix-transactions</artifactId>
-            <version>0.8.8-SNAPSHOT</version>
+            <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
         <dependency>
@@ -217,7 +239,7 @@
         <dependency>
             <groupId>org.apache.asterix</groupId>
             <artifactId>asterix-test-framework</artifactId>
-            <version>0.8.8-SNAPSHOT</version>
+            <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -235,13 +257,13 @@
         <dependency>
             <groupId>org.apache.asterix</groupId>
             <artifactId>asterix-replication</artifactId>
-            <version>0.8.8-SNAPSHOT</version>
+            <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.asterix</groupId>
             <artifactId>asterix-external-data</artifactId>
-            <version>0.8.8-SNAPSHOT</version>
+            <version>${project.version}</version>
         </dependency>
     </dependencies>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/assembly/binary-assembly.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/assembly/binary-assembly.xml b/asterix-app/src/main/assembly/binary-assembly.xml
index 013769f..91e2549 100644
--- a/asterix-app/src/main/assembly/binary-assembly.xml
+++ b/asterix-app/src/main/assembly/binary-assembly.xml
@@ -34,4 +34,14 @@
 			<outputDirectory>lib</outputDirectory>
 		</fileSet>
 	</fileSets>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>externallib</outputDirectory>
+            <includes>
+                <include>asterix-external-data:*:zip</include>
+            </includes>
+            <unpack>false</unpack>
+            <useTransitiveDependencies>false</useTransitiveDependencies>
+        </dependencySet>
+    </dependencySets>
 </assembly>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
index 4df461b..79ce721 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
@@ -29,7 +29,7 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.asterix.feed.CentralFeedManager;
+import org.apache.asterix.app.external.CentralFeedManager;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
index 6957926..eacee6d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
@@ -32,13 +32,13 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.asterix.app.external.CentralFeedManager;
 import org.apache.asterix.external.feed.api.IFeedLoadManager;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.watch.FeedActivity;
 import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
-import org.apache.asterix.feed.CentralFeedManager;
 
 public class FeedServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
index d459775..52a140d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
@@ -26,9 +26,9 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.app.external.FeedLifecycleListener;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.message.RemoteSocketMessageListener;
-import org.apache.asterix.feed.FeedLifecycleListener;
 
 public class FeedServletUtil {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java b/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java
new file mode 100644
index 0000000..cab5e64
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.util.List;
+
+import org.apache.asterix.api.common.SessionConfig;
+import org.apache.asterix.api.common.SessionConfig.OutputFormat;
+import org.apache.asterix.aql.translator.QueryTranslator;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.feed.api.ICentralFeedManager;
+import org.apache.asterix.external.feed.api.IFeedLoadManager;
+import org.apache.asterix.external.feed.api.IFeedTrackingManager;
+import org.apache.asterix.external.feed.message.SocketMessageListener;
+import org.apache.asterix.lang.aql.parser.AQLParserFactory;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class CentralFeedManager implements ICentralFeedManager {
+
+    private static final ICentralFeedManager centralFeedManager = new CentralFeedManager();
+    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
+
+    public static ICentralFeedManager getInstance() {
+        return centralFeedManager;
+    }
+
+    private final int port;
+    private final IFeedLoadManager feedLoadManager;
+    private final IFeedTrackingManager feedTrackingManager;
+    private final SocketMessageListener messageListener;
+
+    private CentralFeedManager() {
+        this.port = AsterixAppContextInfo.getInstance().getFeedProperties().getFeedCentralManagerPort();
+        this.feedLoadManager = new FeedLoadManager();
+        this.feedTrackingManager = new FeedTrackingManager();
+        this.messageListener = new SocketMessageListener(port, new FeedMessageReceiver(this));
+    }
+
+    @Override
+    public void start() throws AsterixException {
+        messageListener.start();
+    }
+
+    @Override
+    public void stop() throws AsterixException, IOException {
+        messageListener.stop();
+    }
+
+    public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
+        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        JobId jobId = hcc.startJob(spec);
+        if (waitForCompletion) {
+            hcc.waitForCompletion(jobId);
+        }
+        return jobId;
+    }
+
+    @Override
+    public IFeedLoadManager getFeedLoadManager() {
+        return feedLoadManager;
+    }
+
+    @Override
+    public IFeedTrackingManager getFeedTrackingManager() {
+        return feedTrackingManager;
+    }
+
+    public static class AQLExecutor {
+
+        private static final PrintWriter out = new PrintWriter(System.out, true);
+        private static final IParserFactory parserFactory = new AQLParserFactory();
+
+        public static void executeAQL(String aql) throws Exception {
+            IParser parser = parserFactory.createParser(new StringReader(aql));
+            List<Statement> statements = parser.parse();
+            SessionConfig pc = new SessionConfig(out, OutputFormat.ADM);
+            QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
+            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
+                    QueryTranslator.ResultDelivery.SYNC);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java b/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
new file mode 100644
index 0000000..2f497f9
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
@@ -0,0 +1,762 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.config.AsterixStorageProperties;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
+import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.FilesIndexDescription;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalDatasetIndexesCommitOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalDatasetIndexesRecoverOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalFilesIndexOperatorDescriptor;
+import org.apache.asterix.external.operators.IndexInfoOperatorDescriptor;
+import org.apache.asterix.external.provider.AdapterFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.file.IndexOperations;
+import org.apache.asterix.file.JobSpecificationUtils;
+import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
+import org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadata;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
+import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+public class ExternalIndexingOperations {
+
+    public static final List<List<String>> FILE_INDEX_FIELD_NAMES = new ArrayList<List<String>>();
+    public static final ArrayList<IAType> FILE_INDEX_FIELD_TYPES = new ArrayList<IAType>();
+
+    static {
+        FILE_INDEX_FIELD_NAMES.add(new ArrayList<String>(Arrays.asList("")));
+        FILE_INDEX_FIELD_TYPES.add(BuiltinType.ASTRING);
+    }
+
+    public static boolean isIndexible(ExternalDatasetDetails ds) {
+        String adapter = ds.getAdapter();
+        if (adapter.equalsIgnoreCase(ExternalDataConstants.ALIAS_HDFS_ADAPTER)) {
+            return true;
+        }
+        return false;
+    }
+
+    public static boolean isRefereshActive(ExternalDatasetDetails ds) {
+        return ds.getState() != ExternalDatasetTransactionState.COMMIT;
+    }
+
+    public static boolean isValidIndexName(String datasetName, String indexName) {
+        return (!datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX).equals(indexName));
+    }
+
+    public static String getFilesIndexName(String datasetName) {
+        return datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX);
+    }
+
+    public static int getRIDSize(Dataset dataset) {
+        ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails());
+        return IndexingConstants.getRIDSize(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT));
+    }
+
+    public static IBinaryComparatorFactory[] getComparatorFactories(Dataset dataset) {
+        ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails());
+        return IndexingConstants.getComparatorFactories((dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT)));
+    }
+
+    public static IBinaryComparatorFactory[] getBuddyBtreeComparatorFactories() {
+        return IndexingConstants.getBuddyBtreeComparatorFactories();
+    }
+
+    public static ArrayList<ExternalFile> getSnapshotFromExternalFileSystem(Dataset dataset)
+            throws AlgebricksException {
+        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+        ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        try {
+            // Create the file system object
+            FileSystem fs = getFileSystemObject(datasetDetails.getProperties());
+            // Get paths of dataset
+            String path = datasetDetails.getProperties().get(ExternalDataConstants.KEY_PATH);
+            String[] paths = path.split(",");
+
+            // Add fileStatuses to files
+            for (String aPath : paths) {
+                FileStatus[] fileStatuses = fs.listStatus(new Path(aPath));
+                for (int i = 0; i < fileStatuses.length; i++) {
+                    int nextFileNumber = files.size();
+                    if (fileStatuses[i].isDirectory()) {
+                        listSubFiles(dataset, fs, fileStatuses[i], files);
+                    } else {
+                        files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber,
+                                fileStatuses[i].getPath().toUri().getPath(),
+                                new Date(fileStatuses[i].getModificationTime()), fileStatuses[i].getLen(),
+                                ExternalFilePendingOp.PENDING_NO_OP));
+                    }
+                }
+            }
+            // Close file system
+            fs.close();
+            if (files.size() == 0) {
+                throw new AlgebricksException("File Snapshot retrieved from external file system is empty");
+            }
+            return files;
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("Unable to get list of HDFS files " + e);
+        }
+    }
+
+    /* list all files under the directory
+     * src is expected to be a folder
+     */
+    private static void listSubFiles(Dataset dataset, FileSystem srcFs, FileStatus src, ArrayList<ExternalFile> files)
+            throws IOException {
+        Path path = src.getPath();
+        FileStatus[] fileStatuses = srcFs.listStatus(path);
+        for (int i = 0; i < fileStatuses.length; i++) {
+            int nextFileNumber = files.size();
+            if (fileStatuses[i].isDirectory()) {
+                listSubFiles(dataset, srcFs, fileStatuses[i], files);
+            } else {
+                files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber,
+                        fileStatuses[i].getPath().toUri().getPath(), new Date(fileStatuses[i].getModificationTime()),
+                        fileStatuses[i].getLen(), ExternalFilePendingOp.PENDING_NO_OP));
+            }
+        }
+    }
+
+    public static FileSystem getFileSystemObject(Map<String, String> map) throws IOException {
+        Configuration conf = new Configuration();
+        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, map.get(ExternalDataConstants.KEY_HDFS_URL).trim());
+        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, DistributedFileSystem.class.getName());
+        return FileSystem.get(conf);
+    }
+
+    public static JobSpecification buildFilesIndexReplicationJobSpec(Dataset dataset,
+            ArrayList<ExternalFile> externalFilesSnapshot, AqlMetadataProvider metadataProvider, boolean createIndex)
+                    throws MetadataException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+                metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(),
+                        getFilesIndexName(dataset.getDatasetName()), true);
+        IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+        FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
+        ILocalResourceMetadata localResourceMetadata = new ExternalBTreeLocalResourceMetadata(
+                filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, filesIndexDescription.FILES_INDEX_COMP_FACTORIES,
+                new int[] { 0 }, false, dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties);
+        PersistentLocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
+                localResourceMetadata, LocalResource.ExternalBTreeResource);
+        ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
+                mergePolicyFactory, mergePolicyFactoryProperties,
+                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                storageProperties.getBloomFilterFalsePositiveRate(),
+                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
+        ExternalFilesIndexOperatorDescriptor externalFilesOp = new ExternalFilesIndexOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                secondaryFileSplitProvider, indexDataflowHelperFactory, localResourceFactoryProvider,
+                externalFilesSnapshot, createIndex);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, externalFilesOp,
+                secondarySplitsAndConstraint.second);
+        spec.addRoot(externalFilesOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    /**
+     * This method create an indexing operator that index records in HDFS
+     *
+     * @param jobSpec
+     * @param itemType
+     * @param dataset
+     * @param files
+     * @param indexerDesc
+     * @return
+     * @throws Exception
+     */
+    private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDataIndexingOperator(
+            JobSpecification jobSpec, IAType itemType, Dataset dataset, List<ExternalFile> files,
+            RecordDescriptor indexerDesc, AqlMetadataProvider metadataProvider) throws Exception {
+        ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        Map<String, String> configuration = externalDatasetDetails.getProperties();
+        IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(externalDatasetDetails.getAdapter(),
+                configuration, (ARecordType) itemType, files, true);
+        return new Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint>(
+                new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory),
+                adapterFactory.getPartitionConstraint());
+    }
+
+    public static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(
+            JobSpecification spec, AqlMetadataProvider metadataProvider, Dataset dataset, ARecordType itemType,
+            RecordDescriptor indexerDesc, List<ExternalFile> files) throws Exception {
+        if (files == null) {
+            files = MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset);
+        }
+        return getExternalDataIndexingOperator(spec, itemType, dataset, files, indexerDesc, metadataProvider);
+    }
+
+    /**
+     * At the end of this method, we expect to have 4 sets as follows:
+     * metadataFiles should contain only the files that are appended in their original state
+     * addedFiles should contain new files that has number assigned starting after the max original file number
+     * deletedFiles should contain files that are no longer there in the file system
+     * appendedFiles should have the new file information of existing files
+     * The method should return false in case of zero delta
+     *
+     * @param dataset
+     * @param metadataFiles
+     * @param addedFiles
+     * @param deletedFiles
+     * @param appendedFiles
+     * @return
+     * @throws MetadataException
+     * @throws AlgebricksException
+     */
+    public static boolean isDatasetUptodate(Dataset dataset, List<ExternalFile> metadataFiles,
+            List<ExternalFile> addedFiles, List<ExternalFile> deletedFiles, List<ExternalFile> appendedFiles)
+                    throws MetadataException, AlgebricksException {
+        boolean uptodate = true;
+        int newFileNumber = metadataFiles.get(metadataFiles.size() - 1).getFileNumber() + 1;
+
+        ArrayList<ExternalFile> fileSystemFiles = getSnapshotFromExternalFileSystem(dataset);
+
+        // Loop over file system files < taking care of added files >
+        for (ExternalFile fileSystemFile : fileSystemFiles) {
+            boolean fileFound = false;
+            Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
+            while (mdFilesIterator.hasNext()) {
+                ExternalFile metadataFile = mdFilesIterator.next();
+                if (fileSystemFile.getFileName().equals(metadataFile.getFileName())) {
+                    // Same file name
+                    if (fileSystemFile.getLastModefiedTime().equals(metadataFile.getLastModefiedTime())) {
+                        // Same timestamp
+                        if (fileSystemFile.getSize() == metadataFile.getSize()) {
+                            // Same size -> no op
+                            mdFilesIterator.remove();
+                            fileFound = true;
+                        } else {
+                            // Different size -> append op
+                            metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP);
+                            fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP);
+                            appendedFiles.add(fileSystemFile);
+                            fileFound = true;
+                            uptodate = false;
+                        }
+                    } else {
+                        // Same file name, Different file mod date -> delete and add
+                        metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
+                        deletedFiles
+                                .add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(), 0,
+                                        metadataFile.getFileName(), metadataFile.getLastModefiedTime(),
+                                        metadataFile.getSize(), ExternalFilePendingOp.PENDING_DROP_OP));
+                        fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP);
+                        fileSystemFile.setFileNumber(newFileNumber);
+                        addedFiles.add(fileSystemFile);
+                        newFileNumber++;
+                        fileFound = true;
+                        uptodate = false;
+                    }
+                }
+                if (fileFound) {
+                    break;
+                }
+            }
+            if (!fileFound) {
+                // File not stored previously in metadata -> pending add op
+                fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP);
+                fileSystemFile.setFileNumber(newFileNumber);
+                addedFiles.add(fileSystemFile);
+                newFileNumber++;
+                uptodate = false;
+            }
+        }
+
+        // Done with files from external file system -> metadata files now contain both deleted files and appended ones
+        // first, correct number assignment to deleted and updated files
+        for (ExternalFile deletedFile : deletedFiles) {
+            deletedFile.setFileNumber(newFileNumber);
+            newFileNumber++;
+        }
+        for (ExternalFile appendedFile : appendedFiles) {
+            appendedFile.setFileNumber(newFileNumber);
+            newFileNumber++;
+        }
+
+        // include the remaining deleted files
+        Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
+        while (mdFilesIterator.hasNext()) {
+            ExternalFile metadataFile = mdFilesIterator.next();
+            if (metadataFile.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP) {
+                metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
+                deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(),
+                        newFileNumber, metadataFile.getFileName(), metadataFile.getLastModefiedTime(),
+                        metadataFile.getSize(), metadataFile.getPendingOp()));
+                newFileNumber++;
+                uptodate = false;
+            }
+        }
+        return uptodate;
+    }
+
+    public static Dataset createTransactionDataset(Dataset dataset) {
+        ExternalDatasetDetails originalDsd = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        ExternalDatasetDetails dsd = new ExternalDatasetDetails(originalDsd.getAdapter(), originalDsd.getProperties(),
+                originalDsd.getTimestamp(), ExternalDatasetTransactionState.BEGIN);
+        Dataset transactionDatset = new Dataset(dataset.getDataverseName(), dataset.getDatasetName(),
+                dataset.getItemTypeDataverseName(), dataset.getItemTypeName(), dataset.getNodeGroupName(),
+                dataset.getCompactionPolicy(), dataset.getCompactionPolicyProperties(), dsd, dataset.getHints(),
+                DatasetType.EXTERNAL, dataset.getDatasetId(), dataset.getPendingOp());
+        return transactionDatset;
+    }
+
+    public static boolean isFileIndex(Index index) {
+        return (index.getIndexName().equals(getFilesIndexName(index.getDatasetName())));
+    }
+
+    public static JobSpecification buildDropFilesIndexJobSpec(CompiledIndexDropStatement indexDropStmt,
+            AqlMetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException {
+        String dataverseName = indexDropStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
+                : indexDropStmt.getDataverseName();
+        String datasetName = indexDropStmt.getDatasetName();
+        String indexName = indexDropStmt.getIndexName();
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForFilesIndex(dataverseName, datasetName, indexName, true);
+        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+                metadataProvider.getMetadataTxnContext());
+        IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                splitsAndConstraint.first,
+                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                        compactionInfo.first, compactionInfo.second,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp));
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
+                splitsAndConstraint.second);
+        spec.addRoot(btreeDrop);
+
+        return spec;
+    }
+
+    public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, List<ExternalFile> metadataFiles,
+            List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
+            AqlMetadataProvider metadataProvider) throws MetadataException, AlgebricksException {
+        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+        for (ExternalFile file : metadataFiles) {
+            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
+                files.add(file);
+            } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
+                for (ExternalFile appendedFile : appendedFiles) {
+                    if (appendedFile.getFileName().equals(file.getFileName())) {
+                        files.add(new ExternalFile(file.getDataverseName(), file.getDatasetName(), file.getFileNumber(),
+                                file.getFileName(), file.getLastModefiedTime(), appendedFile.getSize(),
+                                ExternalFilePendingOp.PENDING_NO_OP));
+                    }
+                }
+            }
+        }
+        for (ExternalFile file : addedFiles) {
+            files.add(file);
+        }
+        Collections.sort(files);
+        return buildFilesIndexReplicationJobSpec(ds, files, metadataProvider, false);
+    }
+
+    public static JobSpecification buildIndexUpdateOp(Dataset ds, Index index, List<ExternalFile> metadataFiles,
+            List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
+            AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
+        // Create files list
+        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+
+        for (ExternalFile metadataFile : metadataFiles) {
+            if (metadataFile.getPendingOp() != ExternalFilePendingOp.PENDING_APPEND_OP) {
+                files.add(metadataFile);
+            } else {
+                metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
+                files.add(metadataFile);
+            }
+        }
+        // add new files
+        for (ExternalFile file : addedFiles) {
+            files.add(file);
+        }
+        // add appended files
+        for (ExternalFile file : appendedFiles) {
+            files.add(file);
+        }
+
+        CompiledCreateIndexStatement ccis = new CompiledCreateIndexStatement(index.getIndexName(),
+                index.getDataverseName(), index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
+                index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
+        return IndexOperations.buildSecondaryIndexLoadingJobSpec(ccis, null, null, metadataProvider, files);
+    }
+
+    public static JobSpecification buildCommitJob(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
+            throws AlgebricksException, AsterixException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
+                metadataProvider.getMetadataTxnContext());
+        boolean temp = ds.getDatasetDetails().isTemp();
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                        getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
+        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
+                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
+        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
+                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                if (index.getIndexType() == IndexType.BTREE) {
+                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, spec));
+                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                } else if (index.getIndexType() == IndexType.RTREE) {
+                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
+                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                }
+            }
+        }
+
+        ExternalDatasetIndexesCommitOperatorDescriptor op = new ExternalDatasetIndexesCommitOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
+                rtreeDataflowHelperFactories, rtreeInfos);
+
+        spec.addRoot(op);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    private static ExternalBTreeDataflowHelperFactory getFilesIndexDataflowHelperFactory(Dataset ds,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
+            AsterixStorageProperties storageProperties, JobSpecification spec) {
+        return new ExternalBTreeDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties,
+                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                storageProperties.getBloomFilterFalsePositiveRate(),
+                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
+    }
+
+    private static ExternalBTreeWithBuddyDataflowHelperFactory getBTreeDataflowHelperFactory(Dataset ds, Index index,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
+            AsterixStorageProperties storageProperties, JobSpecification spec) {
+        return new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties,
+                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+                storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() },
+                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static ExternalRTreeDataflowHelperFactory getRTreeDataflowHelperFactory(Dataset ds, Index index,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
+            AsterixStorageProperties storageProperties, AqlMetadataProvider metadataProvider, JobSpecification spec)
+                    throws AlgebricksException, AsterixException {
+        int numPrimaryKeys = getRIDSize(ds);
+        List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
+        secondaryKeyFields.size();
+        ARecordType itemType = (ARecordType) metadataProvider.findType(ds.getItemTypeDataverseName(),
+                ds.getItemTypeName());
+        Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
+        IAType spatialType = spatialTypePair.first;
+        if (spatialType == null) {
+            throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+        }
+        int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        int numNestedSecondaryKeyFields = numDimensions * 2;
+        IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+                + numNestedSecondaryKeyFields];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        ATypeTag keyType = nestedKeyType.getTypeTag();
+
+        keyType = nestedKeyType.getTypeTag();
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(nestedKeyType);
+            secondaryRecFields[i] = keySerde;
+
+            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+                    .getBinaryComparatorFactory(nestedKeyType, true);
+            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+        }
+        // Add serializers and comparators for primary index fields.
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numNestedSecondaryKeyFields + i] = IndexingConstants.getSerializerDeserializer(i);
+            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = IndexingConstants.getTypeTraits(i);
+        }
+        int[] primaryKeyFields = new int[numPrimaryKeys];
+        for (int i = 0; i < primaryKeyFields.length; i++) {
+            primaryKeyFields[i] = i + numNestedSecondaryKeyFields;
+        }
+
+        return new ExternalRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+                getBuddyBtreeComparatorFactories(), mergePolicyFactory, mergePolicyFactoryProperties,
+                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
+                AqlMetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
+                storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() },
+                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
+    }
+
+    public static JobSpecification buildAbortOp(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
+            throws AlgebricksException, AsterixException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
+                metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+
+        boolean temp = ds.getDatasetDetails().isTemp();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                        getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
+        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
+                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
+        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
+                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                if (index.getIndexType() == IndexType.BTREE) {
+                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, spec));
+                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                } else if (index.getIndexType() == IndexType.RTREE) {
+                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
+                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                }
+            }
+        }
+
+        ExternalDatasetIndexesAbortOperatorDescriptor op = new ExternalDatasetIndexesAbortOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
+                rtreeDataflowHelperFactories, rtreeInfos);
+
+        spec.addRoot(op);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+
+    }
+
+    public static JobSpecification buildRecoverOp(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
+            throws AlgebricksException, AsterixException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
+                metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        boolean temp = ds.getDatasetDetails().isTemp();
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                        getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
+        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
+                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
+        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
+                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                if (index.getIndexType() == IndexType.BTREE) {
+                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, spec));
+                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                } else if (index.getIndexType() == IndexType.RTREE) {
+                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
+                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                }
+            }
+        }
+
+        ExternalDatasetIndexesRecoverOperatorDescriptor op = new ExternalDatasetIndexesRecoverOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
+                rtreeDataflowHelperFactories, rtreeInfos);
+
+        spec.addRoot(op);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, AqlMetadataProvider metadataProvider)
+            throws MetadataException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+                metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(),
+                        getFilesIndexName(dataset.getDatasetName()), true);
+        IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+        ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
+                mergePolicyFactory, mergePolicyFactoryProperties,
+                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                storageProperties.getBloomFilterFalsePositiveRate(),
+                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
+        FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                secondaryFileSplitProvider, filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
+                filesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, indexDataflowHelperFactory,
+                NoOpOperationCallbackFactory.INSTANCE);
+        spec.addRoot(compactOp);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                secondarySplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
new file mode 100755
index 0000000..a5654bd
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
+import org.apache.asterix.external.library.ExternalLibrary;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.external.library.LibraryAdapter;
+import org.apache.asterix.external.library.LibraryFunction;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.api.IMetadataEntity;
+import org.apache.asterix.metadata.entities.DatasourceAdapter;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.metadata.entities.Library;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+
+public class ExternalLibraryUtils {
+
+    private static Logger LOGGER = Logger.getLogger(ExternalLibraryUtils.class.getName());
+
+    public static void setUpExternaLibraries(boolean isMetadataNode) throws Exception {
+
+        // start by un-installing removed libraries (Metadata Node only)
+        Map<String, List<String>> uninstalledLibs = null;
+        if (isMetadataNode) {
+            uninstalledLibs = uninstallLibraries();
+        }
+
+        // get the directory of the to be installed libraries
+        File installLibDir = getLibraryInstallDir();
+        // directory exists?
+        if (installLibDir.exists()) {
+            // get the list of files in the directory
+            for (String dataverse : installLibDir.list()) {
+                File dataverseDir = new File(installLibDir, dataverse);
+                String[] libraries = dataverseDir.list();
+                for (String library : libraries) {
+                    // for each file (library), register library
+                    registerLibrary(dataverse, library, isMetadataNode, installLibDir);
+                    // is metadata node?
+                    if (isMetadataNode) {
+                        // get library file
+                        File libraryDir = new File(installLibDir.getAbsolutePath() + File.separator + dataverse
+                                + File.separator + library);
+                        // install if needed (i,e, add the functions, adapters, datasources, parsers to the metadata) <Not required for use>
+                        installLibraryIfNeeded(dataverse, libraryDir, uninstalledLibs);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * un-install libraries.
+     * @return a map from dataverse -> list of uninstalled libraries.
+     * @throws Exception
+     */
+    private static Map<String, List<String>> uninstallLibraries() throws Exception {
+        Map<String, List<String>> uninstalledLibs = new HashMap<String, List<String>>();
+        // get the directory of the un-install libraries
+        File uninstallLibDir = getLibraryUninstallDir();
+        String[] uninstallLibNames;
+        // directory exists?
+        if (uninstallLibDir.exists()) {
+            // list files
+            uninstallLibNames = uninstallLibDir.list();
+            for (String uninstallLibName : uninstallLibNames) {
+                // Get the <dataverse name - library name> pair
+                String[] components = uninstallLibName.split("\\.");
+                String dataverse = components[0];
+                String libName = components[1];
+                // un-install
+                uninstallLibrary(dataverse, libName);
+                // delete the library file
+                new File(uninstallLibDir, uninstallLibName).delete();
+                // add the library to the list of uninstalled libraries
+                List<String> uinstalledLibsInDv = uninstalledLibs.get(dataverse);
+                if (uinstalledLibsInDv == null) {
+                    uinstalledLibsInDv = new ArrayList<String>();
+                    uninstalledLibs.put(dataverse, uinstalledLibsInDv);
+                }
+                uinstalledLibsInDv.add(libName);
+            }
+        }
+        return uninstalledLibs;
+    }
+
+    /**
+     * Remove the library from metadata completely.
+     * TODO Currently, external libraries only include functions and adapters. we need to extend this to include:
+     * 1. external data source
+     * 2. data parser
+     * @param dataverse
+     * @param libraryName
+     * @return true if the library was found and removed, false otherwise
+     * @throws AsterixException
+     * @throws RemoteException
+     * @throws ACIDException
+     */
+    protected static boolean uninstallLibrary(String dataverse, String libraryName)
+            throws AsterixException, RemoteException, ACIDException {
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            // begin transaction
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            // make sure dataverse exists
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
+            if (dv == null) {
+                return false;
+            }
+            // make sure library exists
+            Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverse, libraryName);
+            if (library == null) {
+                return false;
+            }
+
+            // get dataverse functions
+            List<Function> functions = MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, dataverse);
+            for (Function function : functions) {
+                // does function belong to library?
+                if (function.getName().startsWith(libraryName + "#")) {
+                    // drop the function
+                    MetadataManager.INSTANCE.dropFunction(mdTxnCtx,
+                            new FunctionSignature(dataverse, function.getName(), function.getArity()));
+                }
+            }
+
+            // get the dataverse adapters
+            List<DatasourceAdapter> adapters = MetadataManager.INSTANCE.getDataverseAdapters(mdTxnCtx, dataverse);
+            for (DatasourceAdapter adapter : adapters) {
+                // belong to the library?
+                if (adapter.getAdapterIdentifier().getName().startsWith(libraryName + "#")) {
+                    // remove adapter <! we didn't check if there are feeds which use this adapter>
+                    MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse, adapter.getAdapterIdentifier().getName());
+                }
+            }
+            // drop the library itself
+            MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverse, libraryName);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            throw new AsterixException(e);
+        }
+        return true;
+    }
+
+    /**
+     *  Each element of a library is installed as part of a transaction. Any
+     *  failure in installing an element does not effect installation of other
+     *  libraries.
+     */
+    protected static void installLibraryIfNeeded(String dataverse, final File libraryDir,
+            Map<String, List<String>> uninstalledLibs) throws Exception {
+
+        String libraryName = libraryDir.getName().trim();
+        List<String> uninstalledLibsInDv = uninstalledLibs.get(dataverse);
+        // was this library just un-installed?
+        boolean wasUninstalled = uninstalledLibsInDv != null && uninstalledLibsInDv.contains(libraryName);
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            Library libraryInMetadata = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverse, libraryName);
+            if (libraryInMetadata != null && !wasUninstalled) {
+                // exists in metadata and was not un-installed, we return.
+                // Another place which shows that our metadata transactions are broken (we didn't call commit before!!!)
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                return;
+            }
+
+            // Add library
+            MetadataManager.INSTANCE.addLibrary(mdTxnCtx, new Library(dataverse, libraryName));
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Added library " + libraryName + " to Metadata");
+            }
+
+            // Get the descriptor
+            String[] libraryDescriptors = libraryDir.list(new FilenameFilter() {
+                @Override
+                public boolean accept(File dir, String name) {
+                    return name.endsWith(".xml");
+                }
+            });
+            ExternalLibrary library = getLibrary(new File(libraryDir + File.separator + libraryDescriptors[0]));
+
+            if (libraryDescriptors.length == 0) {
+                // should be fine. library was installed but its content was not added to metadata
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                return;
+            } else if (libraryDescriptors.length > 1) {
+                throw new Exception("More than 1 library descriptors defined");
+            }
+
+            // Get the dataverse
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
+            if (dv == null) {
+                MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverse,
+                        NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT, IMetadataEntity.PENDING_NO_OP));
+            }
+            // Add functions
+            if (library.getLibraryFunctions() != null) {
+                for (LibraryFunction function : library.getLibraryFunctions().getLibraryFunction()) {
+                    String[] fargs = function.getArguments().trim().split(",");
+                    List<String> args = new ArrayList<String>();
+                    for (String arg : fargs) {
+                        args.add(arg);
+                    }
+                    Function f = new Function(dataverse, libraryName + "#" + function.getName().trim(), args.size(),
+                            args, function.getReturnType().trim(), function.getDefinition().trim(),
+                            library.getLanguage().trim(), function.getFunctionType().trim());
+                    MetadataManager.INSTANCE.addFunction(mdTxnCtx, f);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Installed function: " + libraryName + "#" + function.getName().trim());
+                    }
+                }
+            }
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Installed functions in library :" + libraryName);
+            }
+
+            // Add adapters
+            if (library.getLibraryAdapters() != null) {
+                for (LibraryAdapter adapter : library.getLibraryAdapters().getLibraryAdapter()) {
+                    String adapterFactoryClass = adapter.getFactoryClass().trim();
+                    String adapterName = libraryName + "#" + adapter.getName().trim();
+                    AdapterIdentifier aid = new AdapterIdentifier(dataverse, adapterName);
+                    DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass,
+                            IDataSourceAdapter.AdapterType.EXTERNAL);
+                    MetadataManager.INSTANCE.addAdapter(mdTxnCtx, dsa);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Installed adapter: " + adapterName);
+                    }
+                }
+            }
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Installed adapters in library :" + libraryName);
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.info("Exception in installing library " + libraryName);
+            }
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+        }
+    }
+
+    /**
+     * register the library class loader with the external library manager
+     * @param dataverse
+     * @param libraryName
+     * @param isMetadataNode
+     * @param installLibDir
+     * @throws Exception
+     */
+    protected static void registerLibrary(String dataverse, String libraryName, boolean isMetadataNode,
+            File installLibDir) throws Exception {
+        // get the class loader
+        ClassLoader classLoader = getLibraryClassLoader(dataverse, libraryName);
+        // register it with the external library manager
+        ExternalLibraryManager.registerLibraryClassLoader(dataverse, libraryName, classLoader);
+    }
+
+    /**
+     * Get the library from the xml file
+     * @param libraryXMLPath
+     * @return
+     * @throws Exception
+     */
+    private static ExternalLibrary getLibrary(File libraryXMLPath) throws Exception {
+        JAXBContext configCtx = JAXBContext.newInstance(ExternalLibrary.class);
+        Unmarshaller unmarshaller = configCtx.createUnmarshaller();
+        ExternalLibrary library = (ExternalLibrary) unmarshaller.unmarshal(libraryXMLPath);
+        return library;
+    }
+
+    /**
+     * Get the class loader for the library
+     * @param dataverse
+     * @param libraryName
+     * @return
+     * @throws Exception
+     */
+    private static ClassLoader getLibraryClassLoader(String dataverse, String libraryName) throws Exception {
+        // Get a reference to the library directory
+        File installDir = getLibraryInstallDir();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Installing lirbary " + libraryName + " in dataverse " + dataverse + "."
+                    + " Install Directory: " + installDir.getAbsolutePath());
+        }
+
+        // get a reference to the specific library dir
+        File libDir = new File(
+                installDir.getAbsolutePath() + File.separator + dataverse + File.separator + libraryName);
+        FilenameFilter jarFileFilter = new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                return name.endsWith(".jar");
+            }
+        };
+
+        // Get the jar file <Allow only a single jar file>
+        String[] jarsInLibDir = libDir.list(jarFileFilter);
+        if (jarsInLibDir.length > 1) {
+            throw new Exception("Incorrect library structure: found multiple library jars");
+        }
+        if (jarsInLibDir.length < 0) {
+            throw new Exception("Incorrect library structure: could not find library jar");
+        }
+
+        File libJar = new File(libDir, jarsInLibDir[0]);
+        // get the jar dependencies
+        File libDependencyDir = new File(libDir.getAbsolutePath() + File.separator + "lib");
+        int numDependencies = 1;
+        String[] libraryDependencies = null;
+        if (libDependencyDir.exists()) {
+            libraryDependencies = libDependencyDir.list(jarFileFilter);
+            numDependencies += libraryDependencies.length;
+        }
+
+        ClassLoader parentClassLoader = ExternalLibraryUtils.class.getClassLoader();
+        URL[] urls = new URL[numDependencies];
+        int count = 0;
+        // get url of library
+        urls[count++] = libJar.toURI().toURL();
+
+        // get urls for dependencies
+        if (libraryDependencies != null && libraryDependencies.length > 0) {
+            for (String dependency : libraryDependencies) {
+                File file = new File(libDependencyDir + File.separator + dependency);
+                urls[count++] = file.toURI().toURL();
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            StringBuilder logMesg = new StringBuilder("Classpath for library " + libraryName + "\n");
+            for (URL url : urls) {
+                logMesg.append(url.getFile() + "\n");
+            }
+            LOGGER.info(logMesg.toString());
+        }
+
+        // create and return the class loader
+        ClassLoader classLoader = new URLClassLoader(urls, parentClassLoader);
+        return classLoader;
+    }
+
+    /**
+     *  @return the directory "$(pwd)/library": This needs to be improved
+     */
+    protected static File getLibraryInstallDir() {
+        String workingDir = System.getProperty("user.dir");
+        return new File(workingDir + File.separator + "library");
+    }
+
+    /**
+     * @return the directory "$(pwd)/uninstall": This needs to be improved
+     */
+    protected static File getLibraryUninstallDir() {
+        String workingDir = System.getProperty("user.dir");
+        return new File(workingDir + File.separator + "uninstall");
+    }
+
+}