You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2015/06/19 00:29:12 UTC

incubator-asterixdb git commit: Add a JSON rest-api for external connector that uses existing AsterixDB datasets.

Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 65100727f -> 0bae16118


Add a JSON rest-api for external connector that uses existing AsterixDB datasets.

Change-Id: I674110b26262fbbd93030b252113e153ff4580ef
Reviewed-on: https://asterix-gerrit.ics.uci.edu/288
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <im...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/0bae1611
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/0bae1611
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/0bae1611

Branch: refs/heads/master
Commit: 0bae161187c485103d39df5c584c8d2e44ac898f
Parents: 6510072
Author: Yingyi Bu <bu...@gmail.com>
Authored: Wed Jun 17 22:45:28 2015 -0700
Committer: Yingyi Bu <bu...@gmail.com>
Committed: Thu Jun 18 14:52:40 2015 -0700

----------------------------------------------------------------------
 asterix-app/pom.xml                             |  10 +
 .../api/http/servlet/ConnectorAPIServlet.java   | 151 +++++++++
 .../bootstrap/CCApplicationEntryPoint.java      |  12 +-
 .../http/servlet/ConnectorAPIServletTest.java   | 140 ++++++++
 .../metadata/declared/AqlMetadataProvider.java  | 327 ++++++++++---------
 5 files changed, 472 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0bae1611/asterix-app/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-app/pom.xml b/asterix-app/pom.xml
index fe11486..2db0974 100644
--- a/asterix-app/pom.xml
+++ b/asterix-app/pom.xml
@@ -212,6 +212,16 @@
 			<version>0.8.7-SNAPSHOT</version>
 			<scope>test</scope>
 		</dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.10.19</version>
+        </dependency>
+        <dependency>
+        <groupId>com.e-movimento.tinytools</groupId>
+            <artifactId>privilegedaccessor</artifactId>
+            <version>1.2.2</version>
+        </dependency>
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0bae1611/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java
new file mode 100644
index 0000000..c5fb76b
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.api.http.servlet;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Map;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+
+/***
+ * The REST API that takes a dataverse name and a dataset name as the input
+ * and returns an array of file splits (IP, file-path) of the dataset in JSON.
+ * It is mostly used by external runtime, e.g., Pregelix or IMRU to pull data
+ * in parallel from existing AsterixDB datasets.
+ *
+ * @author yingyi
+ */
+public class ConnectorAPIServlet extends HttpServlet {
+    private static final long serialVersionUID = 1L;
+
+    private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+
+    @Override
+    public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+        response.setContentType("text/html");
+        response.setCharacterEncoding("utf-8");
+        PrintWriter out = response.getWriter();
+        try {
+            JSONObject jsonResponse = new JSONObject();
+            String dataverseName = request.getParameter("dataverseName");
+            String datasetName = request.getParameter("datasetName");
+            if (dataverseName == null || datasetName == null) {
+                jsonResponse.put("error", "Parameter dataverseName or datasetName is null,");
+                out.write(jsonResponse.toString());
+                out.flush();
+                return;
+            }
+            ServletContext context = getServletContext();
+
+            IHyracksClientConnection hcc = null;
+            synchronized (context) {
+                hcc = (IHyracksClientConnection) context.getAttribute(HYRACKS_CONNECTION_ATTR);
+            }
+
+            // Metadata transaction begins.
+            MetadataManager.INSTANCE.init();
+            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+            // Retrieves file splits of the dataset.
+            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(null);
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+            if (dataset == null) {
+                jsonResponse.put("error", "Dataset " + datasetName + " does not exist in " + "dataverse "
+                        + dataverseName);
+                out.write(jsonResponse.toString());
+                out.flush();
+                return;
+            }
+            boolean temp = dataset.getDatasetDetails().isTemp();
+            FileSplit[] fileSplits = metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName,
+                    datasetName, temp);
+
+            // Constructs the returned json object.
+            formResponseObject(jsonResponse, fileSplits, hcc.getNodeControllerInfos());
+            // Metadata transaction commits.
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            // Writes file splits.
+            out.write(jsonResponse.toString());
+            out.flush();
+        } catch (Exception e) {
+            e.printStackTrace();
+            out.println(e.getMessage());
+            out.flush();
+            e.printStackTrace(out);
+        }
+    }
+
+    private void formResponseObject(JSONObject jsonResponse, FileSplit[] fileSplits,
+            Map<String, NodeControllerInfo> nodeMap) throws Exception {
+        JSONArray partititons = new JSONArray();
+        // Generates file partitions.
+        for (FileSplit split : fileSplits) {
+            String ipAddress = nodeMap.get(split.getNodeName()).getNetworkAddress().getAddress().toString();
+            String path = split.getLocalFile().getFile().getAbsolutePath();
+            FilePartition partition = new FilePartition(ipAddress, path);
+            partititons.put(partition.toJSONObject());
+        }
+        // Generates the response object which contains the splits.
+        jsonResponse.put("splits", partititons);
+    }
+}
+
+class FilePartition {
+    private final String ipAddress;
+    private final String path;
+
+    public FilePartition(String ipAddress, String path) {
+        this.ipAddress = ipAddress;
+        this.path = path;
+    }
+
+    public String getIPAddress() {
+        return ipAddress;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    @Override
+    public String toString() {
+        return ipAddress + ":" + path;
+    }
+
+    public JSONObject toJSONObject() throws JSONException {
+        JSONObject partition = new JSONObject();
+        partition.put("ip", ipAddress);
+        partition.put("path", path);
+        return partition;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0bae1611/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 1320629..9fa9a76 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -24,6 +24,7 @@ import org.eclipse.jetty.util.component.AbstractLifeCycle;
 
 import edu.uci.ics.asterix.api.http.servlet.APIServlet;
 import edu.uci.ics.asterix.api.http.servlet.AQLAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.ConnectorAPIServlet;
 import edu.uci.ics.asterix.api.http.servlet.DDLAPIServlet;
 import edu.uci.ics.asterix.api.http.servlet.FeedDashboardServlet;
 import edu.uci.ics.asterix.api.http.servlet.FeedDataProviderServlet;
@@ -76,7 +77,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
 
         AsterixAppContextInfo.getInstance().getCCApplicationContext()
-                .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
+        .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
 
         AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
         setupWebServer(externalProperties);
@@ -88,7 +89,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
         setupFeedServer(externalProperties);
         feedServer.start();
-        
+
         waitUntilServerStart(webServer);
         waitUntilServerStart(jsonAPIServer);
         waitUntilServerStart(feedServer);
@@ -100,9 +101,9 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
     }
 
-    private void waitUntilServerStart(AbstractLifeCycle webServer) throws Exception{
-        while(!webServer.isStarted()){
-            if(webServer.isFailed()){
+    private void waitUntilServerStart(AbstractLifeCycle webServer) throws Exception {
+        while (!webServer.isStarted()) {
+            if (webServer.isFailed()) {
                 throw new Exception("Server failed to start");
             }
             wait(1000);
@@ -156,6 +157,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         context.addServlet(new ServletHolder(new UpdateAPIServlet()), "/update");
         context.addServlet(new ServletHolder(new DDLAPIServlet()), "/ddl");
         context.addServlet(new ServletHolder(new AQLAPIServlet()), "/aql");
+        context.addServlet(new ServletHolder(new ConnectorAPIServlet()), "/connector");
     }
 
     private void setupFeedServer(AsterixExternalProperties externalProperties) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0bae1611/asterix-app/src/test/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServletTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServletTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServletTest.java
new file mode 100644
index 0000000..bf651ad
--- /dev/null
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServletTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.api.http.servlet;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import junit.extensions.PA;
+import junit.framework.Assert;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.junit.Test;
+
+import edu.uci.ics.asterix.test.runtime.ExecutionTest;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+
+public class ConnectorAPIServletTest {
+
+    @Test
+    public void testGet() throws Exception {
+        // Starts test asterixdb cluster.
+        ExecutionTest.setUp();
+
+        // Configures a test connector api servlet.
+        ConnectorAPIServlet servlet = spy(new ConnectorAPIServlet());
+        ServletConfig mockServletConfig = mock(ServletConfig.class);
+        servlet.init(mockServletConfig);
+        Map<String, NodeControllerInfo> nodeMap = new HashMap<String, NodeControllerInfo>();
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        PrintWriter outputWriter = new PrintWriter(outputStream);
+
+        // Creates mocks.
+        ServletContext mockContext = mock(ServletContext.class);
+        IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
+        NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class);
+        NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class);
+        HttpServletRequest mockRequest = mock(HttpServletRequest.class);
+        HttpServletResponse mockResponse = mock(HttpServletResponse.class);
+
+        // Sets up mock returns.
+        when(servlet.getServletContext()).thenReturn(mockContext);
+        when(mockRequest.getParameter("dataverseName")).thenReturn("Metadata");
+        when(mockRequest.getParameter("datasetName")).thenReturn("Dataset");
+        when(mockResponse.getWriter()).thenReturn(outputWriter);
+        when(mockContext.getAttribute(anyString())).thenReturn(mockHcc);
+        when(mockHcc.getNodeControllerInfos()).thenReturn(nodeMap);
+        when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099));
+        when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099));
+
+        // Calls ConnectorAPIServlet.formResponseObject.
+        nodeMap.put("nc1", mockInfo1);
+        nodeMap.put("nc2", mockInfo2);
+        servlet.doGet(mockRequest, mockResponse);
+
+        // Constructs the actual response.
+        JSONTokener tokener = new JSONTokener(new InputStreamReader(
+                new ByteArrayInputStream(outputStream.toByteArray())));
+        JSONObject actualResponse = new JSONObject(tokener);
+
+        // Checks the correctness of results.
+        JSONArray splits = actualResponse.getJSONArray("splits");
+        String path = ((JSONObject) splits.get(0)).getString("path");
+        Assert.assertTrue(path.endsWith("Metadata/Dataset_idx_Dataset"));
+
+        // Tears down the asterixdb cluster.
+        ExecutionTest.tearDown();
+    }
+
+    @Test
+    public void testFormResponseObject() throws JSONException {
+        ConnectorAPIServlet servlet = new ConnectorAPIServlet();
+        JSONObject actualResponse = new JSONObject();
+        FileSplit[] splits = new FileSplit[2];
+        splits[0] = new FileSplit("nc1", "foo1");
+        splits[1] = new FileSplit("nc2", "foo2");
+        Map<String, NodeControllerInfo> nodeMap = new HashMap<String, NodeControllerInfo>();
+        NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class);
+        NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class);
+
+        // Sets up mock returns.
+        when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099));
+        when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099));
+
+        // Calls ConnectorAPIServlet.formResponseObject.
+        nodeMap.put("nc1", mockInfo1);
+        nodeMap.put("nc2", mockInfo2);
+        PA.invokeMethod(servlet,
+                "formResponseObject(org.json.JSONObject, edu.uci.ics.hyracks.dataflow.std.file.FileSplit[], "
+                        + "java.util.Map)", actualResponse, splits, nodeMap);
+
+        // Constructs expected response.
+        JSONObject expectedResponse = new JSONObject();
+        JSONArray splitsArray = new JSONArray();
+        JSONObject element1 = new JSONObject();
+        element1.put("ip", "127.0.0.1");
+        element1.put("path", splits[0].getLocalFile().getFile().getAbsolutePath());
+        JSONObject element2 = new JSONObject();
+        element2.put("ip", "127.0.0.2");
+        element2.put("path", splits[1].getLocalFile().getFile().getAbsolutePath());
+        splitsArray.put(element1);
+        splitsArray.put(element2);
+        expectedResponse.put("splits", splitsArray);
+
+        // Checks results.
+        Assert.assertEquals(actualResponse.toString(), expectedResponse.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0bae1611/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 66a932e..5055cea 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -97,8 +97,6 @@ import edu.uci.ics.asterix.runtime.external.ExternalRTreeSearchOperatorDescripto
 import edu.uci.ics.asterix.runtime.formats.FormatUtils;
 import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
 import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
@@ -106,6 +104,8 @@ import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexSearch
 import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
 import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -491,7 +491,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
             JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.RECORD) {
             throw new AlgebricksException("Can only scan datasets of records.");
         }
@@ -568,14 +568,14 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                 case INTERNAL:
                     feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
                             feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
-                                    .getFeedConnectionId().getDatasetName()), adapterFactory, adapterOutputType,
+                            .getFeedConnectionId().getDatasetName()), adapterFactory, adapterOutputType,
                             feedDesc, feedPolicy.getProperties());
                     break;
                 case EXTERNAL:
                     String libraryName = feedDataSource.getFeed().getAdapterName().split("#")[0];
                     feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feedDataSource.getFeedConnectionId(),
                             libraryName, adapterFactory.getClass().getName(), feedDataSource.getFeed()
-                                    .getAdapterConfiguration(), adapterOutputType, feedDesc, feedPolicy.getProperties());
+                            .getAdapterConfiguration(), adapterOutputType, feedDesc, feedPolicy.getProperties());
                     break;
             }
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -599,7 +599,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
             JobSpecification jobSpec, String dataverse, String feedName, String dataset, FeedActivity feedActivity)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         List<String> feedLocations = new ArrayList<String>();
         String[] ingestLocs = feedActivity.getFeedActivityDetails().get(FeedActivityDetails.INGEST_LOCATIONS)
                 .split(",");
@@ -722,12 +722,12 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                                 new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
                                 compactionInfo.second, isSecondary ? new SecondaryIndexOperationTrackerProvider(
                                         dataset.getDatasetId()) : new PrimaryIndexOperationTrackerProvider(
-                                        dataset.getDatasetId()), rtcProvider,
-                                LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                                storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, filterTypeTraits,
-                                filterCmpFactories, btreeFields, filterFields, !temp), retainInput, retainNull,
-                        context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
-                        maxFilterFieldIndexes);
+                                                dataset.getDatasetId()), rtcProvider,
+                                                LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                                                storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, filterTypeTraits,
+                                                filterCmpFactories, btreeFields, filterFields, !temp), retainInput, retainNull,
+                                                context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
+                                                maxFilterFieldIndexes);
             } else {
                 // External dataset <- use the btree with buddy btree->
                 // Be Careful of Key Start Index ?
@@ -735,9 +735,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                 ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory(
                         compactionInfo.first, compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
                                 dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, getStorageProperties()
+                                LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, getStorageProperties()
                                 .getBloomFilterFalsePositiveRate(), buddyBreeFields,
-                        ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
+                                ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
                 btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
                         rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
                         highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
@@ -839,12 +839,12 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                                 new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
                                 compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
                                         dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
-                                        nestedKeyType.getTypeTag(), comparatorFactories.length),
-                                storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
-                                filterTypeTraits, filterCmpFactories, filterFields, !temp), retainInput, retainNull,
-                        context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
-                        maxFilterFieldIndexes);
+                                        LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
+                                                nestedKeyType.getTypeTag(), comparatorFactories.length),
+                                                storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
+                                                filterTypeTraits, filterCmpFactories, filterFields, !temp), retainInput, retainNull,
+                                                context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
+                                                maxFilterFieldIndexes);
 
             } else {
                 // External Dataset
@@ -1039,9 +1039,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                             compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
                                     dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                            storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
-                            filterCmpFactories, btreeFields, filterFields, !temp));
+                                    LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                                    storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
+                                    filterCmpFactories, btreeFields, filterFields, !temp));
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
                     splitsAndConstraint.second);
         } catch (MetadataException me) {
@@ -1053,7 +1053,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
-            throws AlgebricksException {
+                    throws AlgebricksException {
 
         String datasetName = dataSource.getId().getDatasetName();
         Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
@@ -1116,30 +1116,30 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(
                     jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
-                    : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
-                            txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+            : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
+                    txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
-                    dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
-                    datasetId), compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
-                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                    LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties.getBloomFilterFalsePositiveRate(),
-                    true, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, !temp);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
-                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
-            } else {
-                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
-                        fieldPermutation, indexOp, idfh, null, modificationCallbackFactory, true, indexName);
-            }
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+                    Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
+                            dataset, mdTxnCtx);
+                    IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+                            datasetId), compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
+                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                    LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties.getBloomFilterFalsePositiveRate(),
+                                    true, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, !temp);
+                    IOperatorDescriptor op;
+                    if (bulkload) {
+                        long numElementsHint = getCardinalityPerPartitionHint(dataset);
+                        op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+                                appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+                                comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+                                GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
+                    } else {
+                        op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+                                appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                                splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+                                fieldPermutation, indexOp, idfh, null, modificationCallbackFactory, true, indexName);
+                    }
+                    return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
 
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
@@ -1151,7 +1151,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, typeEnv, keys, payload,
                 additionalNonKeyFields, recordDesc, context, spec, bulkload);
     }
@@ -1279,7 +1279,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
             JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload)
-            throws AlgebricksException {
+                    throws AlgebricksException {
 
         // Sanity checks.
         if (primaryKeys.size() > 1) {
@@ -1469,7 +1469,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         return getIndexInsertOrDeleteRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
                 typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
                 false);
@@ -1477,7 +1477,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
     private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
             IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         // No filtering condition.
         if (filterExpr == null) {
             return null;
@@ -1591,37 +1591,37 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
                     jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
                     ResourceType.LSM_BTREE) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                    modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
-                    dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
-                    datasetId), compactionInfo.first, compactionInfo.second,
-                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
-                    btreeFields, filterFields, !temp);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
-                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
-            } else {
-                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
-                        fieldPermutation, indexOp, new LSMBTreeDataflowHelperFactory(
-                                new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first,
-                                compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
-                                        dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                                storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
-                                filterCmpFactories, btreeFields, filterFields, !temp), filterFactory,
-                        modificationCallbackFactory, false, indexName);
-            }
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+
+                    Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
+                            dataset, mdTxnCtx);
+                    IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+                            datasetId), compactionInfo.first, compactionInfo.second,
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                            storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
+                            btreeFields, filterFields, !temp);
+                    IOperatorDescriptor op;
+                    if (bulkload) {
+                        long numElementsHint = getCardinalityPerPartitionHint(dataset);
+                        op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+                                appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+                                comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+                                GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
+                    } else {
+                        op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+                                appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                                splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+                                fieldPermutation, indexOp, new LSMBTreeDataflowHelperFactory(
+                                        new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first,
+                                        compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
+                                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                                LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                                                storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
+                                                filterCmpFactories, btreeFields, filterFields, !temp), filterFactory,
+                                                modificationCallbackFactory, false, indexName);
+                    }
+                    return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
         } catch (MetadataException e) {
             throw new AlgebricksException(e);
         } catch (IOException e) {
@@ -1719,10 +1719,11 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
             // SecondaryKeys.size() can be two if it comes from the bulkload.
             // In this case, [token, number of token] are the secondaryKeys.
-            if (!isPartitioned || secondaryKeys.size() > 1)
+            if (!isPartitioned || secondaryKeys.size() > 1) {
                 numTokenFields = secondaryKeys.size();
-            else if (isPartitioned && secondaryKeys.size() == 1)
+            } else if (isPartitioned && secondaryKeys.size() == 1) {
                 numTokenFields = secondaryKeys.size() + 1;
+            }
 
             ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
             ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
@@ -1791,46 +1792,46 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
                     jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
                     ResourceType.LSM_INVERTED_INDEX) : new SecondaryIndexModificationOperationCallbackFactory(jobId,
-                    datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                    ResourceType.LSM_INVERTED_INDEX);
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
-                    dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory indexDataFlowFactory;
-            if (!isPartitioned) {
-                indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
-                        datasetId), compactionInfo.first, compactionInfo.second,
-                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
-                        storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
-                        filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
-                        invertedIndexFieldsForNonBulkLoadOps, !temp);
-            } else {
-                indexDataFlowFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
-                        new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
-                        compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
-                        storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
-                        filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
-                        invertedIndexFieldsForNonBulkLoadOps, !temp);
-            }
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, false,
-                        numElementsHint, false, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
-                        appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
-                        invListsTypeTraits, invListComparatorFactories, tokenizerFactory, indexDataFlowFactory);
-            } else {
-                op = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), splitsAndConstraint.first,
-                        appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
-                        invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
-                        indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName);
-            }
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+                            datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+                            ResourceType.LSM_INVERTED_INDEX);
+
+                    Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
+                            dataset, mdTxnCtx);
+                    IIndexDataflowHelperFactory indexDataFlowFactory;
+                    if (!isPartitioned) {
+                        indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+                                datasetId), compactionInfo.first, compactionInfo.second,
+                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                                storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+                                filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+                                invertedIndexFieldsForNonBulkLoadOps, !temp);
+                    } else {
+                        indexDataFlowFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+                                new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+                                compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                                storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+                                filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+                                invertedIndexFieldsForNonBulkLoadOps, !temp);
+                    }
+                    IOperatorDescriptor op;
+                    if (bulkload) {
+                        long numElementsHint = getCardinalityPerPartitionHint(dataset);
+                        op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, false,
+                                numElementsHint, false, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+                                appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+                                invListsTypeTraits, invListComparatorFactories, tokenizerFactory, indexDataFlowFactory);
+                    } else {
+                        op = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
+                                appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+                                appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+                                invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
+                                indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName);
+                    }
+                    return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
         } catch (MetadataException e) {
             throw new AlgebricksException(e);
         } catch (IOException e) {
@@ -1939,41 +1940,41 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
                     jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
                     ResourceType.LSM_RTREE) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                    modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
-                    dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new LSMRTreeDataflowHelperFactory(valueProviderFactories,
-                    RTreePolicyType.RTREE, primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(
-                            dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
-                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                    proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
-                    storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
-                    filterCmpFactories, filterFields, !temp);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
-                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
-                        primaryComparatorFactories, btreeFields, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
-            } else {
-                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, indexOp,
-                        new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
-                                primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset
-                                        .getDatasetId()), compactionInfo.first, compactionInfo.second,
-                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
-                                        nestedKeyType.getTypeTag(), comparatorFactories.length), storageProperties
-                                        .getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
-                                filterCmpFactories, filterFields, !temp), filterFactory,
-                        modificationCallbackFactory, false, indexName);
-            }
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
+
+                    Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
+                            dataset, mdTxnCtx);
+                    IIndexDataflowHelperFactory idfh = new LSMRTreeDataflowHelperFactory(valueProviderFactories,
+                            RTreePolicyType.RTREE, primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(
+                                    dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
+                                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
+                                    proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
+                                    storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
+                                    filterCmpFactories, filterFields, !temp);
+                    IOperatorDescriptor op;
+                    if (bulkload) {
+                        long numElementsHint = getCardinalityPerPartitionHint(dataset);
+                        op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+                                appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+                                primaryComparatorFactories, btreeFields, fieldPermutation,
+                                GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
+                    } else {
+                        op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+                                appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                                splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, indexOp,
+                                new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+                                        primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset
+                                                .getDatasetId()), compactionInfo.first, compactionInfo.second,
+                                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                                LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
+                                                        nestedKeyType.getTypeTag(), comparatorFactories.length), storageProperties
+                                                        .getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
+                                                        filterCmpFactories, filterFields, !temp), filterFactory,
+                                                        modificationCallbackFactory, false, indexName);
+                    }
+                    return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
         } catch (MetadataException | IOException e) {
             throw new AlgebricksException(e);
         }
@@ -1997,7 +1998,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
      * Calculate an estimate size of the bloom filter. Note that this is an
      * estimation which assumes that the data is going to be uniformly
      * distributed across all partitions.
-     * 
+     *
      * @param dataset
      * @return Number of elements that will be used to create a bloom filter per
      *         dataset per partition
@@ -2072,7 +2073,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return splits.toArray(new FileSplit[] {});
     }
 
-    private FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
+    public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
             String targetIdxName, boolean temp) throws AlgebricksException {
         try {
             File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
@@ -2216,7 +2217,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
     /**
      * Add HDFS scheduler and the cluster location constraint into the scheduler
-     * 
+     *
      * @param properties
      *            the original dataset properties
      * @return a new map containing the original dataset properties and the
@@ -2232,7 +2233,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
     /**
      * Adapt the original properties to a string-object map
-     * 
+     *
      * @param properties
      *            the original properties
      * @return the new stirng-object map