You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/01/01 02:17:03 UTC

[2/3] incubator-asterixdb git commit: Divide Cluster into Unique Partitions

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
index 0eacc15..e89a8db 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.indexing.operators;
 import java.util.List;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
@@ -49,18 +50,15 @@ public class ExternalDatasetIndexesCommitOperatorDescriptor extends AbstractExte
     @Override
     protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
             IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        System.err.println("performing the operation on "+ IndexFileNameUtil.prepareFileName(fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getIODeviceId()));
+        FileReference resourecePath = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
+        System.err.println("performing the operation on "+ resourecePath.getFile().getAbsolutePath());
         // Get DataflowHelper
         IIndexDataflowHelper indexHelper = indexDataflowHelperFactory.createIndexDataflowHelper(fileIndexInfo, ctx, partition);
         // Get index
         IIndex index = indexHelper.getIndexInstance();
         // commit transaction
         ((ITwoPCIndex) index).commitTransaction();
-        System.err.println("operation on "+ IndexFileNameUtil.prepareFileName(fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getIODeviceId()) + " Succeded");
+        System.err.println("operation on "+ resourecePath.getFile().getAbsolutePath() + " Succeded");
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
index 8e7a288..9bdfaa6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
@@ -49,9 +49,7 @@ public class ExternalDatasetIndexesRecoverOperatorDescriptor extends AbstractExt
     @Override
     protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
             IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        FileReference file = new FileReference(new File(IndexFileNameUtil.prepareFileName(fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getIODeviceId())));
+        FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
         AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
         fileManager.recoverTransaction();
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 2254f6f..09c65c8 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -27,8 +27,6 @@ import java.util.Set;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Unmarshaller;
 
-import org.kohsuke.args4j.Option;
-
 import org.apache.asterix.event.management.EventUtil;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.MasterNode;
@@ -37,6 +35,7 @@ import org.apache.asterix.event.service.AsterixEventServiceUtil;
 import org.apache.asterix.installer.driver.InstallerDriver;
 import org.apache.asterix.installer.schema.conf.Configuration;
 import org.apache.asterix.installer.schema.conf.Zookeeper;
+import org.kohsuke.args4j.Option;
 
 public class ValidateCommand extends AbstractCommand {
 
@@ -97,7 +96,7 @@ public class ValidateCommand extends AbstractCommand {
             valid = false;
         } else {
             cluster = EventUtil.getCluster(clusterPath);
-            validateClusterProperties(cluster);
+            valid = valid & validateClusterProperties(cluster);
 
             Set<String> servers = new HashSet<String>();
             Set<String> serverIds = new HashSet<String>();
@@ -106,7 +105,7 @@ public class ValidateCommand extends AbstractCommand {
 
             MasterNode masterNode = cluster.getMasterNode();
             Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
-                    masterNode.getLogDir(), null, null, null, null);
+                    masterNode.getLogDir(), null, null, null);
             ipAddresses.add(masterNode.getClusterIp());
 
             valid = valid & validateNodeConfiguration(master, cluster);
@@ -158,7 +157,7 @@ public class ValidateCommand extends AbstractCommand {
         return true;
     }
 
-    private void validateClusterProperties(Cluster cluster) {
+    private boolean validateClusterProperties(Cluster cluster) {
         List<String> tempDirs = new ArrayList<String>();
         if (cluster.getLogDir() != null && checkTemporaryPath(cluster.getLogDir())) {
             tempDirs.add("Log directory: " + cluster.getLogDir());
@@ -176,6 +175,11 @@ public class ValidateCommand extends AbstractCommand {
             LOGGER.warn(msg);
         }
 
+        if (cluster.getStore() == null || cluster.getStore().length() == 0) {
+            LOGGER.fatal("store not defined at cluster" + ERROR);
+            return false;
+        }
+        return true;
     }
 
     private boolean validateNodeConfiguration(Node node, Cluster cluster) {
@@ -201,14 +205,6 @@ public class ValidateCommand extends AbstractCommand {
             }
         }
 
-        if (node.getStore() == null || node.getStore().length() == 0) {
-            if (!cluster.getMasterNode().getId().equals(node.getId())
-                    && (cluster.getStore() == null || cluster.getStore().length() == 0)) {
-                valid = false;
-                LOGGER.fatal("store not defined at cluster/node level for node: " + node.getId() + ERROR);
-            }
-        }
-
         if (node.getIodevices() == null || node.getIodevices().length() == 0) {
             if (!cluster.getMasterNode().getId().equals(node.getId())
                     && (cluster.getIodevices() == null || cluster.getIodevices().length() == 0)) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java b/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
index 56da6ee..1ac60ba 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
@@ -48,14 +48,11 @@ public class InstallerUtil {
         String storeDataSubDir = asterixInstanceName + File.separator + "data" + File.separator;
         String[] storeDirs = null;
         StringBuffer nodeDataStore = new StringBuffer();
-        String storeDirValue = node.getStore();
+        String storeDirValue = cluster.getStore();
         if (storeDirValue == null) {
-            storeDirValue = cluster.getStore();
-            if (storeDirValue == null) {
-                throw new IllegalStateException(" Store not defined for node " + node.getId());
-            }
-            storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
+            throw new IllegalStateException(" Store not defined for node " + node.getId());
         }
+        storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
 
         storeDirs = storeDirValue.split(",");
         for (String ns : storeDirs) {
@@ -66,8 +63,8 @@ public class InstallerUtil {
         return nodeDataStore.toString();
     }
 
-    public static AsterixConfiguration getAsterixConfiguration(String asterixConf) throws FileNotFoundException,
-            IOException, JAXBException {
+    public static AsterixConfiguration getAsterixConfiguration(String asterixConf)
+            throws FileNotFoundException, IOException, JAXBException {
         if (asterixConf == null) {
             asterixConf = InstallerDriver.getManagixHome() + File.separator + DEFAULT_ASTERIX_CONFIGURATION_PATH;
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index f22b2f1..5317fc2 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -32,6 +32,7 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -62,8 +63,10 @@ import org.apache.asterix.metadata.entities.Node;
 import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.AdapterIdentifier;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
@@ -108,7 +111,6 @@ public class MetadataBootstrap {
     private static IIOManager ioManager;
 
     private static String metadataNodeName;
-    private static String metadataStore;
     private static Set<String> nodeNames;
     private static String outputDir;
 
@@ -147,9 +149,7 @@ public class MetadataBootstrap {
 
         AsterixMetadataProperties metadataProperties = propertiesProvider.getMetadataProperties();
         metadataNodeName = metadataProperties.getMetadataNodeName();
-        metadataStore = metadataProperties.getMetadataStore();
         nodeNames = metadataProperties.getNodeNames();
-        // nodeStores = asterixProperity.getStores();
 
         dataLifecycleManager = runtimeContext.getDatasetLifecycleManager();
         localResourceRepository = runtimeContext.getLocalResourceRepository();
@@ -375,11 +375,14 @@ public class MetadataBootstrap {
 
     private static void enlistMetadataDataset(IMetadataIndex index, boolean create, MetadataTransactionContext mdTxnCtx)
             throws Exception {
-        String filePath = ioManager.getIODevices().get(runtimeContext.getMetaDataIODeviceId()).getPath()
-                + File.separator
-                + IndexFileNameUtil.prepareFileName(metadataStore + File.separator + index.getFileNameRelativePath(),
-                        runtimeContext.getMetaDataIODeviceId());
-        FileReference file = new FileReference(new File(filePath));
+        ClusterPartition metadataPartition = propertiesProvider.getMetadataProperties().getMetadataPartition();
+        int metadataDeviceId = metadataPartition.getIODeviceNum();
+        String metadataPartitionPath = SplitsAndConstraintsUtil.prepareStoragePartitionPath(
+                AsterixClusterProperties.INSTANCE.getStorageDirectoryName(),
+                metadataPartition.getPartitionId());
+        String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
+        FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
+
         List<IVirtualBufferCache> virtualBufferCaches = runtimeContext
                 .getVirtualBufferCaches(index.getDatasetId().getId());
         ITypeTraits[] typeTraits = index.getTypeTraits();
@@ -391,7 +394,7 @@ public class MetadataBootstrap {
                 ? runtimeContext.getLSMBTreeOperationTracker(index.getDatasetId().getId())
                 : new BaseOperationTracker(index.getDatasetId().getId(),
                         dataLifecycleManager.getDatasetInfo(index.getDatasetId().getId()));
-        final String path = file.getFile().getPath();
+        final String absolutePath = file.getFile().getPath();
         if (create) {
             lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
                     comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
@@ -409,12 +412,13 @@ public class MetadataBootstrap {
             ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
                     localResourceMetadata, LocalResource.LSMBTreeResource);
             ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
-            localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0));
-            dataLifecycleManager.register(path, lsmBtree);
+            localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, resourceName,
+                    metadataPartition.getPartitionId(), absolutePath));
+            dataLifecycleManager.register(absolutePath, lsmBtree);
         } else {
-            final LocalResource resource = localResourceRepository.getResourceByName(path);
+            final LocalResource resource = localResourceRepository.getResourceByPath(absolutePath);
             resourceID = resource.getResourceId();
-            lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(resource.getResourceName());
+            lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(absolutePath);
             if (lsmBtree == null) {
                 lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
                         typeTraits, comparatorFactories, bloomFilterKeyFields,
@@ -424,7 +428,7 @@ public class MetadataBootstrap {
                         opTracker, runtimeContext.getLSMIOScheduler(),
                         LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
                         null, null, null, null, true);
-                dataLifecycleManager.register(path, lsmBtree);
+                dataLifecycleManager.register(absolutePath, lsmBtree);
             }
         }
 
@@ -529,4 +533,4 @@ public class MetadataBootstrap {
             MetadataManager.INSTANCE.releaseWriteLatch();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
deleted file mode 100644
index 92ab90d..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.metadata.declared;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.annotations.TypeDataGen;
-import org.apache.asterix.common.config.AsterixMetadataProperties;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.formats.base.IDataFormat;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.api.IMetadataManager;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Datatype;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.data.IAWriterFactory;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-
-public class AqlCompiledMetadataDeclarations {
-    private static Logger LOGGER = Logger.getLogger(AqlCompiledMetadataDeclarations.class.getName());
-
-    // We are assuming that there is a one AqlCompiledMetadataDeclarations per
-    // transaction.
-    private final MetadataTransactionContext mdTxnCtx;
-    private String dataverseName = null;
-    private FileSplit outputFile;
-    private Map<String, String[]> stores;
-    private IDataFormat format;
-    private Map<String, String> config;
-
-    private final Map<String, IAType> types;
-    private final Map<String, TypeDataGen> typeDataGenMap;
-    private final IAWriterFactory writerFactory;
-
-    private IMetadataManager metadataManager = MetadataManager.INSTANCE;
-    private boolean isConnected = false;
-
-    public AqlCompiledMetadataDeclarations(MetadataTransactionContext mdTxnCtx, String dataverseName,
-            FileSplit outputFile, Map<String, String> config, Map<String, String[]> stores, Map<String, IAType> types,
-            Map<String, TypeDataGen> typeDataGenMap, IAWriterFactory writerFactory, boolean online) {
-        this.mdTxnCtx = mdTxnCtx;
-        this.dataverseName = dataverseName;
-        this.outputFile = outputFile;
-        this.config = config;
-        AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
-        if (stores == null && online) {
-            this.stores = metadataProperties.getStores();
-        } else {
-            this.stores = stores;
-        }
-        this.types = types;
-        this.typeDataGenMap = typeDataGenMap;
-        this.writerFactory = writerFactory;
-    }
-
-    public void connectToDataverse(String dvName) throws AlgebricksException, AsterixException {
-        if (isConnected) {
-            throw new AlgebricksException("You are already connected to " + dataverseName + " dataverse");
-        }
-        Dataverse dv;
-        try {
-            dv = metadataManager.getDataverse(mdTxnCtx, dvName);
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-        if (dv == null) {
-            throw new AlgebricksException("There is no dataverse with this name " + dvName + " to connect to.");
-        }
-        dataverseName = dvName;
-        isConnected = true;
-        try {
-            format = (IDataFormat) Class.forName(dv.getDataFormat()).newInstance();
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-    }
-
-    public void disconnectFromDataverse() throws AlgebricksException {
-        if (!isConnected) {
-            throw new AlgebricksException("You are not connected to any dataverse");
-        }
-        dataverseName = null;
-        format = null;
-        isConnected = false;
-    }
-
-    public boolean isConnectedToDataverse() {
-        return isConnected;
-    }
-
-    public String getDataverseName() {
-        return dataverseName;
-    }
-
-    public FileSplit getOutputFile() {
-        return outputFile;
-    }
-
-    public IDataFormat getFormat() throws AlgebricksException {
-        if (!isConnected) {
-            throw new AlgebricksException("You need first to connect to a dataverse.");
-        }
-        return format;
-    }
-
-    public String getPropertyValue(String propertyName) {
-        return config.get(propertyName);
-    }
-
-    public IAType findType(String typeName) {
-        Datatype type;
-        try {
-            type = metadataManager.getDatatype(mdTxnCtx, dataverseName, typeName);
-        } catch (Exception e) {
-            throw new IllegalStateException();
-        }
-        if (type == null) {
-            throw new IllegalStateException();
-        }
-        return type.getDatatype();
-    }
-
-    public List<String> findNodeGroupNodeNames(String nodeGroupName) throws AlgebricksException {
-        NodeGroup ng;
-        try {
-            ng = metadataManager.getNodegroup(mdTxnCtx, nodeGroupName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-        if (ng == null) {
-            throw new AlgebricksException("No node group with this name " + nodeGroupName);
-        }
-        return ng.getNodeNames();
-    }
-
-    public Map<String, String[]> getAllStores() {
-        return stores;
-    }
-
-    public Dataset findDataset(String datasetName) throws AlgebricksException {
-        try {
-            return metadataManager.getDataset(mdTxnCtx, dataverseName, datasetName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
-        try {
-            return metadataManager.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public Index getDatasetPrimaryIndex(String dataverseName, String datasetName) throws AlgebricksException {
-        try {
-            return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public Index getIndex(String dataverseName, String datasetName, String indexName) throws AlgebricksException {
-        try {
-            return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public void setOutputFile(FileSplit outputFile) {
-        this.outputFile = outputFile;
-    }
-
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-            String datasetName, String targetIdxName) throws AlgebricksException {
-        FileSplit[] splits = splitsForInternalOrFeedDataset(datasetName, targetIdxName);
-        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
-        String[] loc = new String[splits.length];
-        for (int p = 0; p < splits.length; p++) {
-            loc[p] = splits[p].getNodeName();
-        }
-        AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
-        return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
-    }
-
-    private FileSplit[] splitsForInternalOrFeedDataset(String datasetName, String targetIdxName)
-            throws AlgebricksException {
-
-        File relPathFile = new File(getRelativePath(datasetName + "_idx_" + targetIdxName));
-        Dataset dataset = findDataset(datasetName);
-        if (dataset.getDatasetType() != DatasetType.INTERNAL) {
-            throw new AlgebricksException("Not an internal dataset");
-        }
-        List<String> nodeGroup = findNodeGroupNodeNames(dataset.getNodeGroupName());
-        if (nodeGroup == null) {
-            throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
-        }
-
-        List<FileSplit> splitArray = new ArrayList<FileSplit>();
-        for (String nd : nodeGroup) {
-            String[] nodeStores = stores.get(nd);
-            if (nodeStores == null) {
-                LOGGER.warning("Node " + nd + " has no stores.");
-                throw new AlgebricksException("Node " + nd + " has no stores.");
-            } else {
-                for (int j = 0; j < nodeStores.length; j++) {
-                    File f = new File(nodeStores[j] + File.separator + relPathFile);
-                    splitArray.add(new FileSplit(nd, new FileReference(f)));
-                }
-            }
-        }
-        FileSplit[] splits = new FileSplit[splitArray.size()];
-        int i = 0;
-        for (FileSplit fs : splitArray) {
-            splits[i++] = fs;
-        }
-        return splits;
-    }
-
-    public String getRelativePath(String fileName) {
-        return dataverseName + File.separator + fileName;
-    }
-
-    public Map<String, TypeDataGen> getTypeDataGenMap() {
-        return typeDataGenMap;
-    }
-
-    public Map<String, IAType> getTypeDeclarations() {
-        return types;
-    }
-
-    public IAWriterFactory getWriterFactory() {
-        return writerFactory;
-    }
-
-    public MetadataTransactionContext getMetadataTransactionContext() {
-        return mdTxnCtx;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
index ad06737..e960336 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
@@ -19,10 +19,6 @@
 
 package org.apache.asterix.metadata.declared;
 
-import java.util.ArrayList;
-import java.util.Map;
-
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
@@ -49,16 +45,6 @@ public class AqlLogicalPlanAndMetadataImpl implements ILogicalPlanAndMetadata {
 
     @Override
     public AlgebricksPartitionConstraint getClusterLocations() {
-        Map<String, String[]> stores = metadataProvider.getAllStores();
-        ArrayList<String> locs = new ArrayList<String>();
-        for (String k : stores.keySet()) {
-            String[] nodeStores = stores.get(k);
-            for (int j = 0; j < nodeStores.length; j++) {
-                locs.add(k);
-            }
-        }
-        String[] cluster = new String[locs.size()];
-        cluster = locs.toArray(cluster);
-        return new AlgebricksAbsolutePartitionConstraint(cluster);
+        return metadataProvider.getClusterLocations();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index d61d323..745f436 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -91,6 +91,7 @@ import org.apache.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.metadata.feeds.FeedUtil;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -148,7 +149,6 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.ShortPointable;
@@ -160,16 +160,13 @@ import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import org.apache.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
 import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
 import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
@@ -2095,10 +2092,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return jobId;
     }
 
-    public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
-        return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
-    }
-
     public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
             throws AlgebricksException {
         return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
@@ -2128,7 +2121,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
                 .getNodeNames();
         for (String nd : nodeGroup) {
-            numPartitions += AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
+            numPartitions += AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
         }
         return numElementsHint /= numPartitions;
     }
@@ -2141,88 +2134,17 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
             String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
         FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
-        return splitProviderAndPartitionConstraints(splits);
+        return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraints(splits);
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
             String dataverse) {
-        FileSplit[] splits = splitsForDataverse(mdTxnCtx, dataverse);
-        return splitProviderAndPartitionConstraints(splits);
-    }
-
-    private Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
-            FileSplit[] splits) {
-        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
-        String[] loc = new String[splits.length];
-        for (int p = 0; p < splits.length; p++) {
-            loc[p] = splits[p].getNodeName();
-        }
-        AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
-        return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
-    }
-
-    private FileSplit[] splitsForDataverse(MetadataTransactionContext mdTxnCtx, String dataverseName) {
-        File relPathFile = new File(dataverseName);
-        List<FileSplit> splits = new ArrayList<FileSplit>();
-        for (Map.Entry<String, String[]> entry : stores.entrySet()) {
-            String node = entry.getKey();
-            String[] nodeStores = entry.getValue();
-            if (nodeStores == null) {
-                continue;
-            }
-            for (int i = 0; i < nodeStores.length; i++) {
-                int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
-                String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(node);
-                for (int j = 0; j < nodeStores.length; j++) {
-                    for (int k = 0; k < numIODevices; k++) {
-                        File f = new File(ioDevices[k] + File.separator + nodeStores[j] + File.separator + relPathFile);
-                        splits.add(new FileSplit(node, new FileReference(f), k));
-                    }
-                }
-            }
-        }
-        return splits.toArray(new FileSplit[] {});
+        return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse);
     }
 
     public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
             String targetIdxName, boolean temp) throws AlgebricksException {
-        try {
-            File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
-                    .getNodeNames();
-            if (nodeGroup == null) {
-                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
-            }
-
-            List<FileSplit> splitArray = new ArrayList<FileSplit>();
-            for (String nd : nodeGroup) {
-                String[] nodeStores = stores.get(nd);
-                if (nodeStores == null) {
-                    LOGGER.warning("Node " + nd + " has no stores.");
-                    throw new AlgebricksException("Node " + nd + " has no stores.");
-                } else {
-                    int numIODevices;
-                    if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
-                        numIODevices = 1;
-                    } else {
-                        numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
-                    }
-                    String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
-                    for (int j = 0; j < nodeStores.length; j++) {
-                        for (int k = 0; k < numIODevices; k++) {
-                            File f = new File(ioDevices[k] + File.separator + nodeStores[j]
-                                    + (temp ? (File.separator + TEMP_DATASETS_STORAGE_FOLDER) : "") + File.separator
-                                    + relPathFile);
-                            splitArray.add(new FileSplit(nd, new FileReference(f), k));
-                        }
-                    }
-                }
-            }
-            return splitArray.toArray(new FileSplit[0]);
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
+        return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
     }
 
     private static Map<String, String> initializeAdapterFactoryMapping() {
@@ -2256,10 +2178,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return adapter;
     }
 
-    private static String getRelativePath(String dataverseName, String fileName) {
-        return dataverseName + File.separator + fileName;
-    }
-
     public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
         try {
             return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, dataset);
@@ -2307,19 +2225,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     }
 
     public AlgebricksPartitionConstraint getClusterLocations() {
-        ArrayList<String> locs = new ArrayList<String>();
-        for (String i : stores.keySet()) {
-            String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
-            for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                }
-            }
-        }
-        String[] cluster = new String[locs.size()];
-        cluster = locs.toArray(cluster);
-        return new AlgebricksAbsolutePartitionConstraint(cluster);
+        return AsterixClusterProperties.INSTANCE.getClusterLocations();
     }
 
     public IDataFormat getFormat() {
@@ -2334,7 +2240,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
      * @return a new map containing the original dataset properties and the
      *         scheduler/locations
      */
-    private Map<String, Object> wrapProperties(Map<String, String> properties) {
+    private static Map<String, Object> wrapProperties(Map<String, String> properties) {
         Map<String, Object> wrappedProperties = new HashMap<String, Object>();
         wrappedProperties.putAll(properties);
         // wrappedProperties.put(SCHEDULER, hdfsScheduler);
@@ -2349,7 +2255,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
      *            the original properties
      * @return the new stirng-object map
      */
-    private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+    private static Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
         Map<String, Object> wrappedProperties = new HashMap<String, Object>();
         wrappedProperties.putAll(properties);
         return wrappedProperties;
@@ -2357,58 +2263,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
             String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
-        FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
-        return splitProviderAndPartitionConstraints(splits);
-    }
-
-    private FileSplit[] splitsForFilesIndex(MetadataTransactionContext mdTxnCtx, String dataverseName,
-            String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
-
-        try {
-            File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
-                    .getNodeNames();
-            if (nodeGroup == null) {
-                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
-            }
-
-            List<FileSplit> splitArray = new ArrayList<FileSplit>();
-            for (String nd : nodeGroup) {
-                String[] nodeStores = stores.get(nd);
-                if (nodeStores == null) {
-                    LOGGER.warning("Node " + nd + " has no stores.");
-                    throw new AlgebricksException("Node " + nd + " has no stores.");
-                } else {
-                    // Only the first partition when create
-                    String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
-                    if (create) {
-                        for (int j = 0; j < nodeStores.length; j++) {
-                            File f = new File(
-                                    ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
-                            splitArray.add(new FileSplit(nd, new FileReference(f), 0));
-                        }
-                    } else {
-                        int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
-                        for (int j = 0; j < nodeStores.length; j++) {
-                            for (int k = 0; k < numIODevices; k++) {
-                                File f = new File(
-                                        ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
-                                splitArray.add(new FileSplit(nd, new FileReference(f), 0));
-                            }
-                        }
-                    }
-                }
-            }
-            FileSplit[] splits = new FileSplit[splitArray.size()];
-            int i = 0;
-            for (FileSplit fs : splitArray) {
-                splits[i++] = fs;
-            }
-            return splits;
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
+        return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName,
+                datasetName, targetIdxName, create);
     }
 
     public AsterixStorageProperties getStorageProperties() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
new file mode 100644
index 0000000..5ef58cd
--- /dev/null
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataConstants;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class SplitsAndConstraintsUtil {
+
+    public static final String PARTITION_DIR_PREFIX = "partition_";
+    public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
+    public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
+
+    private static FileSplit[] splitsForDataverse(String dataverseName) {
+        File relPathFile = new File(dataverseName);
+        List<FileSplit> splits = new ArrayList<FileSplit>();
+        //get all partitions
+        ClusterPartition[] clusterPartition = AsterixClusterProperties.INSTANCE.getClusterPartitons();
+        String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+        for (int j = 0; j < clusterPartition.length; j++) {
+            int nodeParitions = AsterixClusterProperties.INSTANCE
+                    .getNodePartitionsCount(clusterPartition[j].getNodeId());
+            for (int i = 0; i < nodeParitions; i++) {
+                File f = new File(prepareStoragePartitionPath(storageDirName, clusterPartition[i].getPartitionId())
+                        + File.separator + relPathFile);
+                splits.add(getFileSplitForClusterPartition(clusterPartition[j], f));
+            }
+        }
+        return splits.toArray(new FileSplit[] {});
+    }
+
+    public static FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
+            String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
+        try {
+            File relPathFile = new File(prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
+            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+                    .getNodeNames();
+            if (nodeGroup == null) {
+                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
+            }
+
+            String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+            List<FileSplit> splits = new ArrayList<FileSplit>();
+            for (String nd : nodeGroup) {
+                int numPartitions = AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
+                ClusterPartition[] nodePartitions = AsterixClusterProperties.INSTANCE.getNodePartitions(nd);
+                //currently this case is never executed since the metadata group doesn't exists
+                if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
+                    numPartitions = 1;
+                }
+
+                for (int k = 0; k < numPartitions; k++) {
+                    //format: 'storage dir name'/partition_#/dataverse/dataset_idx_index
+                    File f = new File(prepareStoragePartitionPath(storageDirName, nodePartitions[k].getPartitionId())
+                            + (temp ? (File.separator + TEMP_DATASETS_STORAGE_FOLDER) : "") + File.separator
+                            + relPathFile);
+                    splits.add(getFileSplitForClusterPartition(nodePartitions[k], f));
+                }
+            }
+            return splits.toArray(new FileSplit[] {});
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    private static FileSplit[] splitsForFilesIndex(MetadataTransactionContext mdTxnCtx, String dataverseName,
+            String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
+        try {
+            File relPathFile = new File(prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
+            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+                    .getNodeNames();
+            if (nodeGroup == null) {
+                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
+            }
+
+            List<FileSplit> splits = new ArrayList<FileSplit>();
+            for (String nodeId : nodeGroup) {
+                //get node partitions
+                ClusterPartition[] nodePartitions = AsterixClusterProperties.INSTANCE.getNodePartitions(nodeId);
+                String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+                int firstPartition = 0;
+                if (create) {
+                    // Only the first partition when create
+                    File f = new File(
+                            prepareStoragePartitionPath(storageDirName, nodePartitions[firstPartition].getPartitionId())
+                                    + File.separator + relPathFile);
+                    splits.add(getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+                } else {
+                    for (int k = 0; k < nodePartitions.length; k++) {
+                        File f = new File(prepareStoragePartitionPath(storageDirName,
+                                nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile);
+                        splits.add(getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+                    }
+                }
+            }
+            return splits.toArray(new FileSplit[] {});
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
+            String dataverse) {
+        FileSplit[] splits = splitsForDataverse(dataverse);
+        return splitProviderAndPartitionConstraints(splits);
+    }
+
+    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
+            MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String targetIdxName,
+            boolean create) throws AlgebricksException {
+        FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
+        return splitProviderAndPartitionConstraints(splits);
+    }
+
+    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
+            FileSplit[] splits) {
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+        String[] loc = new String[splits.length];
+        for (int p = 0; p < splits.length; p++) {
+            loc[p] = splits[p].getNodeName();
+        }
+        AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
+        return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
+    }
+
+    private static FileSplit getFileSplitForClusterPartition(ClusterPartition partition, File relativeFile) {
+        return new FileSplit(partition.getActiveNodeId(), new FileReference(relativeFile), partition.getIODeviceNum(),
+                partition.getPartitionId());
+    }
+
+    public static String prepareStoragePartitionPath(String storageDirName, int partitonId) {
+        return storageDirName + File.separator + PARTITION_DIR_PREFIX + partitonId;
+    }
+
+    private static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
+        return dataverseName + File.separator + datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index f2482da..95eea63 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -33,6 +34,7 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -50,7 +52,7 @@ public class AsterixClusterProperties {
     public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
 
     private static final String IO_DEVICES = "iodevices";
-
+    private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
     private Map<String, Map<String, String>> ncConfiguration = new HashMap<String, Map<String, String>>();
 
     private final Cluster cluster;
@@ -59,6 +61,9 @@ public class AsterixClusterProperties {
 
     private boolean globalRecoveryCompleted = false;
 
+    private Map<String, ClusterPartition[]> node2PartitionsMap = null;
+    private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
+
     private AsterixClusterProperties() {
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
         if (is != null) {
@@ -66,37 +71,73 @@ public class AsterixClusterProperties {
                 JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
                 Unmarshaller unmarshaller = ctx.createUnmarshaller();
                 cluster = (Cluster) unmarshaller.unmarshal(is);
-
             } catch (JAXBException e) {
                 throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
             }
         } else {
             cluster = null;
         }
+        //if this is the CC process
+        if (AsterixAppContextInfo.getInstance() != null) {
+            if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) {
+                node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
+                clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
+            }
+        }
     }
 
     private ClusterState state = ClusterState.UNUSABLE;
 
     public synchronized void removeNCConfiguration(String nodeId) {
+        updateNodePartitions(nodeId, false);
         ncConfiguration.remove(nodeId);
-        if (ncConfiguration.keySet().size() != AsterixAppContextInfo.getInstance().getMetadataProperties()
-                .getNodeNames().size()) {
-            state = ClusterState.UNUSABLE;
-            LOGGER.info("Cluster now is in UNSABLE state");
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(" Removing configuration parameters for node id " + nodeId);
         }
-        resetClusterPartitionConstraint();
+        //TODO implement fault tolerance as follows:
+        //1. collect the partitions of the failed NC
+        //2. For each partition, request a remote replica to take over. 
+        //3. wait until each remote replica completes the recovery for the lost partitions
+        //4. update the cluster state
     }
 
     public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
         ncConfiguration.put(nodeId, configuration);
-        if (ncConfiguration.keySet().size() == AsterixAppContextInfo.getInstance().getMetadataProperties()
-                .getNodeNames().size()) {
-            state = ClusterState.ACTIVE;
-        }
+        updateNodePartitions(nodeId, true);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" Registering configuration parameters for node id " + nodeId);
         }
-        resetClusterPartitionConstraint();
+    }
+
+    private synchronized void updateNodePartitions(String nodeId, boolean added) {
+        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
+        //if this isn't a storage node, it will not have cluster partitions
+        if (nodePartitions != null) {
+            for (ClusterPartition p : nodePartitions) {
+                //set the active node for this node's partitions
+                p.setActive(added);
+                if (added) {
+                    p.setActiveNodeId(nodeId);
+                } else {
+                    p.setActiveNodeId(null);
+                }
+            }
+            resetClusterPartitionConstraint();
+            updateClusterState();
+        }
+    }
+
+    private synchronized void updateClusterState() {
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (!p.isActive()) {
+                state = ClusterState.UNUSABLE;
+                LOGGER.info("Cluster is in UNSABLE state");
+                return;
+            }
+        }
+        //if all storage partitions are active, then the cluster is active
+        state = ClusterState.ACTIVE;
+        LOGGER.info("Cluster is now ACTIVE");
     }
 
     /**
@@ -162,20 +203,14 @@ public class AsterixClusterProperties {
     }
 
     private synchronized void resetClusterPartitionConstraint() {
-        Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
-        ArrayList<String> locs = new ArrayList<String>();
-        for (String i : stores.keySet()) {
-            String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
-            for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                }
+        ArrayList<String> clusterActiveLocations = new ArrayList<>();
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (p.isActive()) {
+                clusterActiveLocations.add(p.getActiveNodeId());
             }
         }
-        String[] cluster = new String[locs.size()];
-        cluster = locs.toArray(cluster);
-        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(cluster);
+        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                clusterActiveLocations.toArray(new String[] {}));
     }
 
     public boolean isGlobalRecoveryCompleted() {
@@ -194,7 +229,34 @@ public class AsterixClusterProperties {
         return AsterixClusterProperties.INSTANCE.getState() == ClusterState.ACTIVE;
     }
 
-    public static int getNumberOfNodes(){
+    public static int getNumberOfNodes() {
         return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size();
     }
+
+    public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
+        return node2PartitionsMap.get(nodeId);
+    }
+
+    public synchronized int getNodePartitionsCount(String node) {
+        if (node2PartitionsMap.containsKey(node)) {
+            return node2PartitionsMap.get(node).length;
+        }
+        return 0;
+    }
+
+    public synchronized ClusterPartition[] getClusterPartitons() {
+        ArrayList<ClusterPartition> partitons = new ArrayList<>();
+        for (ClusterPartition cluster : clusterPartitions.values()) {
+            partitons.add(cluster);
+        }
+        return partitons.toArray(new ClusterPartition[] {});
+    }
+
+    public String getStorageDirectoryName() {
+        if (cluster != null) {
+            return cluster.getStore();
+        }
+        //virtual cluster without cluster config file
+        return DEFAULT_STORAGE_DIR_NAME;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 58bc44e..794f867 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -50,13 +50,13 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 48ebff3..06a1957 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -47,12 +47,12 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 23eb2be..f2a6820 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -48,12 +48,12 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 6e0394a..8d838a3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -48,12 +48,12 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
index 0566367..15224e2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
@@ -33,7 +33,7 @@ public class PersistentLocalResourceFactory implements ILocalResourceFactory {
     }
 
     @Override
-    public LocalResource createLocalResource(long resourceId, String resourceName, int partition) {
-        return new LocalResource(resourceId, resourceName, partition, resourceType, localResourceMetadata);
+    public LocalResource createLocalResource(long resourceId, String resourceName, int partition, String resourcePath) {
+        return new LocalResource(resourceId, resourceName, partition, resourcePath, resourceType, localResourceMetadata);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 8ae3eb1..52fd806 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -64,8 +64,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
 
-    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId)
-            throws HyracksDataException {
+    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
         mountPoints = new String[devices.size()];
         this.nodeId = nodeId;
         for (int i = 0; i < mountPoints.length; i++) {
@@ -123,7 +122,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
             }
 
             LocalResource rootLocalResource = new LocalResource(STORAGE_LOCAL_RESOURCE_ID,
-                    storageMetadataFile.getAbsolutePath(), 0, 0, storageRootDirPath);
+                    storageMetadataFile.getAbsolutePath(), 0, storageMetadataFile.getAbsolutePath(), 0,
+                    storageRootDirPath);
             insert(rootLocalResource);
             LOGGER.log(Level.INFO, "created the root-metadata-file: " + storageMetadataFile.getAbsolutePath());
         }
@@ -131,13 +131,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     }
 
     @Override
-    public LocalResource getResourceByName(String name) throws HyracksDataException {
-        LocalResource resource = resourceCache.getIfPresent(name);
+    public LocalResource getResourceByPath(String path) throws HyracksDataException {
+        LocalResource resource = resourceCache.getIfPresent(path);
         if (resource == null) {
-            File resourceFile = getLocalResourceFileByName(name);
+            File resourceFile = getLocalResourceFileByName(path);
             if (resourceFile.exists()) {
                 resource = readLocalResource(resourceFile);
-                resourceCache.put(name, resource);
+                resourceCache.put(path, resource);
             }
         }
         return resource;
@@ -145,13 +145,15 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
 
     @Override
     public synchronized void insert(LocalResource resource) throws HyracksDataException {
-        File resourceFile = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
+        File resourceFile = new File(getFileName(resource.getResourcePath(), resource.getResourceId()));
         if (resourceFile.exists()) {
             throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
+        } else {
+            resourceFile.getParentFile().mkdirs();
         }
 
         if (resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
-            resourceCache.put(resource.getResourceName(), resource);
+            resourceCache.put(resource.getResourcePath(), resource);
         }
 
         FileOutputStream fos = null;
@@ -182,18 +184,18 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
 
             //if replication enabled, send resource metadata info to remote nodes
             if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
-                String filePath = getFileName(resource.getResourceName(), resource.getResourceId());
+                String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
                 createReplicationJob(ReplicationOperation.REPLICATE, filePath);
             }
         }
     }
 
     @Override
-    public synchronized void deleteResourceByName(String name) throws HyracksDataException {
-        File resourceFile = getLocalResourceFileByName(name);
+    public synchronized void deleteResourceByPath(String resourcePath) throws HyracksDataException {
+        File resourceFile = getLocalResourceFileByName(resourcePath);
         if (resourceFile.exists()) {
             resourceFile.delete();
-            resourceCache.invalidate(name);
+            resourceCache.invalidate(resourcePath);
 
             //if replication enabled, delete resource from remote replicas
             if (isReplicationEnabled && !resourceFile.getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
@@ -204,8 +206,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
         }
     }
 
-    private static File getLocalResourceFileByName(String resourceName) {
-        return new File(resourceName + File.separator + METADATA_FILE_NAME);
+    private static File getLocalResourceFileByName(String resourcePath) {
+        return new File(resourcePath + File.separator + METADATA_FILE_NAME);
     }
 
     public HashMap<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
@@ -220,25 +222,21 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
             }
 
             //load all local resources.
-            File[] dataverseFileList = storageRootDir.listFiles();
-            if (dataverseFileList != null) {
-                for (File dataverseFile : dataverseFileList) {
-                    if (dataverseFile.isDirectory()) {
-                        File[] indexFileList = dataverseFile.listFiles();
-                        if (indexFileList != null) {
-                            for (File indexFile : indexFileList) {
-                                if (indexFile.isDirectory()) {
-                                    File[] ioDevicesList = indexFile.listFiles();
-                                    if (ioDevicesList != null) {
-                                        for (File ioDeviceFile : ioDevicesList) {
-                                            if (ioDeviceFile.isDirectory()) {
-                                                File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
-                                                if (metadataFiles != null) {
-                                                    for (File metadataFile : metadataFiles) {
-                                                        LocalResource localResource = readLocalResource(metadataFile);
-                                                        resourcesMap.put(localResource.getResourceId(), localResource);
-                                                    }
-                                                }
+            File[] partitions = storageRootDir.listFiles();
+            for (File partition : partitions) {
+                File[] dataverseFileList = partition.listFiles();
+                if (dataverseFileList != null) {
+                    for (File dataverseFile : dataverseFileList) {
+                        if (dataverseFile.isDirectory()) {
+                            File[] indexFileList = dataverseFile.listFiles();
+                            if (indexFileList != null) {
+                                for (File indexFile : indexFileList) {
+                                    if (indexFile.isDirectory()) {
+                                        File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+                                        if (metadataFiles != null) {
+                                            for (File metadataFile : metadataFiles) {
+                                                LocalResource localResource = readLocalResource(metadataFile);
+                                                resourcesMap.put(localResource.getResourceId(), localResource);
                                             }
                                         }
                                     }
@@ -263,27 +261,23 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
                 continue;
             }
 
-            //traverse all local resources.
-            File[] dataverseFileList = storageRootDir.listFiles();
-            if (dataverseFileList != null) {
-                for (File dataverseFile : dataverseFileList) {
-                    if (dataverseFile.isDirectory()) {
-                        File[] indexFileList = dataverseFile.listFiles();
-                        if (indexFileList != null) {
-                            for (File indexFile : indexFileList) {
-                                if (indexFile.isDirectory()) {
-                                    File[] ioDevicesList = indexFile.listFiles();
-                                    if (ioDevicesList != null) {
-                                        for (File ioDeviceFile : ioDevicesList) {
-                                            if (ioDeviceFile.isDirectory()) {
-                                                File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
-                                                if (metadataFiles != null) {
-                                                    for (File metadataFile : metadataFiles) {
-                                                        LocalResource localResource = readLocalResource(metadataFile);
-                                                        maxResourceId = Math.max(maxResourceId,
-                                                                localResource.getResourceId());
-                                                    }
-                                                }
+            //load all local resources.
+            File[] partitions = storageRootDir.listFiles();
+            for (File partition : partitions) {
+                //traverse all local resources.
+                File[] dataverseFileList = partition.listFiles();
+                if (dataverseFileList != null) {
+                    for (File dataverseFile : dataverseFileList) {
+                        if (dataverseFile.isDirectory()) {
+                            File[] indexFileList = dataverseFile.listFiles();
+                            if (indexFileList != null) {
+                                for (File indexFile : indexFileList) {
+                                    if (indexFile.isDirectory()) {
+                                        File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+                                        if (metadataFiles != null) {
+                                            for (File metadataFile : metadataFiles) {
+                                                LocalResource localResource = readLocalResource(metadataFile);
+                                                maxResourceId = Math.max(maxResourceId, localResource.getResourceId());
                                             }
                                         }
                                     }
@@ -305,8 +299,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
             if (!baseDir.endsWith(System.getProperty("file.separator"))) {
                 baseDir += System.getProperty("file.separator");
             }
-            String fileName = new String(baseDir + METADATA_FILE_NAME);
-            return fileName;
+            return new String(baseDir + METADATA_FILE_NAME);
         }
     }
 
@@ -376,6 +369,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
 
     /**
      * Deletes physical files of all data verses.
+     *
      * @param deleteStorageMetadata
      * @throws IOException
      */