You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/01/01 02:17:02 UTC

[1/3] incubator-asterixdb git commit: Divide Cluster into Unique Partitions

Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master e3e13735b -> 1d5cf6403


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index f602156..701e529 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -44,11 +44,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -70,11 +72,14 @@ import org.apache.asterix.transaction.management.service.transaction.Transaction
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -295,10 +300,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             //get datasetLifeCycleManager
             IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                     .getDatasetLifecycleManager();
+            IIOManager ioManager = appRuntimeContext.getIOManager();
             ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
             Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
                     .loadAndGetAllResources();
-
             //set log reader to the lowWaterMarkLsn again.
             logReader.initializeScan(lowWaterMarkLSN);
             logRecord = logReader.next();
@@ -346,14 +351,15 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                             //get index instance from IndexLifeCycleManager
                             //if index is not registered into IndexLifeCycleManager,
                             //create the index using LocalMetadata stored in LocalResourceRepository
-                            index = (ILSMIndex) datasetLifecycleManager.getIndex(localResource.getResourceName());
+                            String resourceAbsolutePath = localResource.getResourcePath();
+                            index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
                             if (index == null) {
                                 //#. create index instance and register to indexLifeCycleManager
                                 localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
                                 index = localResourceMetadata.createIndexInstance(appRuntimeContext,
-                                        localResource.getResourceName(), localResource.getPartition());
-                                datasetLifecycleManager.register(localResource.getResourceName(), index);
-                                datasetLifecycleManager.open(localResource.getResourceName());
+                                        resourceAbsolutePath, localResource.getPartition());
+                                datasetLifecycleManager.register(resourceAbsolutePath, index);
+                                datasetLifecycleManager.open(resourceAbsolutePath);
 
                                 //#. get maxDiskLastLSN
                                 ILSMIndex lsmIndex = index;
@@ -387,7 +393,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             //close all indexes
             Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
             for (long r : resourceIdList) {
-                datasetLifecycleManager.close(resourcesMap.get(r).getResourceName());
+                datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
             }
 
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -501,6 +507,9 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
         //#. get indexLifeCycleManager 
         IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+        IIOManager ioManager = appRuntimeContext.getIOManager();
+        SortedMap<Integer, ClusterPartition> clusterPartitions = ((IAsterixPropertiesProvider) appRuntimeContext
+                .getAppContext()).getMetadataProperties().getClusterPartitions();
         IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
         ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
         Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
@@ -552,12 +561,23 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                             //get index instance from IndexLifeCycleManager
                             //if index is not registered into IndexLifeCycleManager,
                             //create the index using LocalMetadata stored in LocalResourceRepository
-                            index = (ILSMIndex) datasetLifecycleManager.getIndex(localResource.getResourceName());
+                            //get the resource path relative to this node
+                            int resourcePartition = localResource.getPartition();
+                            //get partition io device id
+                            //NOTE:
+                            //currently we store all partition in the same IO device in all nodes. If this changes,
+                            //this needs to be updated to find the IO device in which the partition is stored in this local node.
+                            int ioDevice = clusterPartitions.get(resourcePartition).getIODeviceNum();
+                            String resourceAbsolutePath = ioManager
+                                    .getAbsoluteFileRef(ioDevice, localResource.getResourceName()).getFile()
+                                    .getAbsolutePath();
+                            localResource.setResourcePath(resourceAbsolutePath);
+                            index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
                             if (index == null) {
                                 //#. create index instance and register to indexLifeCycleManager
                                 localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
                                 index = localResourceMetadata.createIndexInstance(appRuntimeContext,
-                                        localResource.getResourceName(), localResource.getPartition());
+                                        resourceAbsolutePath, localResource.getPartition());
                                 datasetLifecycleManager.register(localResource.getResourceName(), index);
                                 datasetLifecycleManager.open(localResource.getResourceName());
 
@@ -595,7 +615,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         //close all indexes
         Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
         for (long r : resourceIdList) {
-            datasetLifecycleManager.close(resourcesMap.get(r).getResourceName());
+            datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
         }
 
         if (LOGGER.isLoggable(Level.INFO)) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
----------------------------------------------------------------------
diff --git a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
index e84135e..4664fb1 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
@@ -30,13 +30,26 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.event.schema.yarnCluster.Cluster;
+import org.apache.asterix.event.schema.yarnCluster.MasterNode;
+import org.apache.asterix.event.schema.yarnCluster.Node;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -73,12 +86,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
-
-import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.event.schema.yarnCluster.Cluster;
-import org.apache.asterix.event.schema.yarnCluster.MasterNode;
-import org.apache.asterix.event.schema.yarnCluster.Node;
 import org.apache.log4j.ConsoleAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -1041,14 +1048,12 @@ public class AsterixApplicationMaster {
                     if (iodevice == null) {
                         iodevice = clusterDesc.getIodevices();
                     }
-                    String storageSuffix = local.getStore() == null ? clusterDesc.getStore() : local.getStore();
-                    String storagePath = iodevice + File.separator + storageSuffix;
                     vargs.add(ncJavaOpts);
                     vargs.add(NC_CLASSNAME);
                     vargs.add("-app-nc-main-class org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint");
                     vargs.add("-node-id " + local.getId());
                     vargs.add("-cc-host " + cC.getClusterIp());
-                    vargs.add("-iodevices " + storagePath);
+                    vargs.add("-iodevices " + iodevice);
                     vargs.add("-cluster-net-ip-address " + local.getClusterIp());
                     vargs.add("-data-ip-address " + local.getClusterIp());
                     vargs.add("-result-ip-address " + local.getClusterIp());

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
----------------------------------------------------------------------
diff --git a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
index dc61506..f1123aa 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
@@ -38,6 +38,12 @@ import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 
+import org.apache.asterix.common.configuration.AsterixConfiguration;
+import org.apache.asterix.common.configuration.Coredump;
+import org.apache.asterix.common.configuration.Store;
+import org.apache.asterix.common.configuration.TransactionLogDir;
+import org.apache.asterix.event.schema.yarnCluster.Cluster;
+import org.apache.asterix.event.schema.yarnCluster.Node;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -76,13 +82,6 @@ import org.apache.hadoop.yarn.util.Records;
 
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.asterix.common.configuration.AsterixConfiguration;
-import org.apache.asterix.common.configuration.Coredump;
-import org.apache.asterix.common.configuration.Store;
-import org.apache.asterix.common.configuration.TransactionLogDir;
-import org.apache.asterix.event.schema.yarnCluster.Cluster;
-import org.apache.asterix.event.schema.yarnCluster.Node;
-
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public class AsterixYARNClient {
@@ -113,13 +112,13 @@ public class AsterixYARNClient {
         }
     }
 
-    public static final Map<String, AsterixYARNClient.Mode> STRING_TO_MODE = ImmutableMap
-            .<String, AsterixYARNClient.Mode> builder().put(Mode.INSTALL.alias, Mode.INSTALL)
-            .put(Mode.START.alias, Mode.START).put(Mode.STOP.alias, Mode.STOP).put(Mode.KILL.alias, Mode.KILL)
-            .put(Mode.DESTROY.alias, Mode.DESTROY).put(Mode.ALTER.alias, Mode.ALTER)
-            .put(Mode.LIBINSTALL.alias, Mode.LIBINSTALL).put(Mode.DESCRIBE.alias, Mode.DESCRIBE)
-            .put(Mode.BACKUP.alias, Mode.BACKUP).put(Mode.LSBACKUP.alias, Mode.LSBACKUP)
-            .put(Mode.RMBACKUP.alias, Mode.RMBACKUP).put(Mode.RESTORE.alias, Mode.RESTORE).build();
+    public static final Map<String, AsterixYARNClient.Mode> STRING_TO_MODE = ImmutableMap.<String, AsterixYARNClient
+            .Mode> builder().put(Mode.INSTALL.alias, Mode.INSTALL).put(Mode.START.alias, Mode.START)
+            .put(Mode.STOP.alias, Mode.STOP).put(Mode.KILL.alias, Mode.KILL).put(Mode.DESTROY.alias, Mode.DESTROY)
+            .put(Mode.ALTER.alias, Mode.ALTER).put(Mode.LIBINSTALL.alias, Mode.LIBINSTALL)
+            .put(Mode.DESCRIBE.alias, Mode.DESCRIBE).put(Mode.BACKUP.alias, Mode.BACKUP)
+            .put(Mode.LSBACKUP.alias, Mode.LSBACKUP).put(Mode.RMBACKUP.alias, Mode.RMBACKUP)
+            .put(Mode.RESTORE.alias, Mode.RESTORE).build();
     private static final Log LOG = LogFactory.getLog(AsterixYARNClient.class);
     public static final String CONF_DIR_REL = ".asterix" + File.separator;
     private static final String instanceLock = "instance";
@@ -223,8 +222,8 @@ public class AsterixYARNClient {
                 }
                 break;
             case KILL:
-                if (client.isRunning() &&
-                    Utils.confirmAction("Are you sure you want to kill this instance? In-progress tasks will be aborted")) {
+                if (client.isRunning() && Utils.confirmAction(
+                        "Are you sure you want to kill this instance? In-progress tasks will be aborted")) {
                     try {
                         AsterixYARNClient.killApplication(client.getLockFile(), client.yarnClient);
                     } catch (ApplicationNotFoundException e) {
@@ -232,8 +231,7 @@ public class AsterixYARNClient {
                         System.out.println("Asterix instance by that name already exited or was never started");
                         client.deleteLockFile();
                     }
-                }
-                else if(!client.isRunning()){
+                } else if (!client.isRunning()) {
                     System.out.println("Asterix instance by that name already exited or was never started");
                     client.deleteLockFile();
                 }
@@ -254,8 +252,8 @@ public class AsterixYARNClient {
                 break;
             case DESTROY:
                 try {
-                    if (client.force
-                            || Utils.confirmAction("Are you really sure you want to obliterate this instance? This action cannot be undone!")) {
+                    if (client.force || Utils.confirmAction(
+                            "Are you really sure you want to obliterate this instance? This action cannot be undone!")) {
                         app = client.makeApplicationContext();
                         res = client.deployConfig();
                         res.addAll(client.distributeBinaries());
@@ -289,7 +287,8 @@ public class AsterixYARNClient {
                 }
                 break;
             default:
-                LOG.fatal("Unknown mode. Known client modes are: start, stop, install, describe, kill, destroy, describe, backup, restore, lsbackup, rmbackup");
+                LOG.fatal(
+                        "Unknown mode. Known client modes are: start, stop, install, describe, kill, destroy, describe, backup, restore, lsbackup, rmbackup");
                 client.printUsage();
                 System.exit(-1);
         }
@@ -366,8 +365,8 @@ public class AsterixYARNClient {
                 "Amount of memory in MB to be requested to run the application master"));
         opts.addOption(new Option("log_properties", true, "log4j.properties file"));
         opts.addOption(new Option("n", "name", true, "Asterix instance name (required)"));
-        opts.addOption(new Option("zip", "asterixZip", true,
-                "zip file with AsterixDB inside- if in non-default location"));
+        opts.addOption(
+                new Option("zip", "asterixZip", true, "zip file with AsterixDB inside- if in non-default location"));
         opts.addOption(new Option("bc", "baseConfig", true,
                 "base Asterix parameters configuration file if not in default position"));
         opts.addOption(new Option("c", "asterixConf", true, "Asterix cluster config (required on install)"));
@@ -381,13 +380,13 @@ public class AsterixYARNClient {
                 "Backup timestamp for arguments requiring a specific backup (rm, restore)"));
         opts.addOption(new Option("v", "debug", false, "Dump out debug information"));
         opts.addOption(new Option("help", false, "Print usage"));
-        opts.addOption(new Option("f", "force", false,
-                "Execute this command as fully as possible, disregarding any caution"));
+        opts.addOption(
+                new Option("f", "force", false, "Execute this command as fully as possible, disregarding any caution"));
         return opts;
     }
 
     /**
-   */
+    */
     public AsterixYARNClient() throws Exception {
         this(new YarnConfiguration());
     }
@@ -589,8 +588,9 @@ public class AsterixYARNClient {
             try {
                 ApplicationReport previousAppReport = yarnClient.getApplicationReport(lockAppId);
                 YarnApplicationState prevStatus = previousAppReport.getYarnApplicationState();
-                if (!(prevStatus == YarnApplicationState.FAILED || prevStatus == YarnApplicationState.KILLED || prevStatus == YarnApplicationState.FINISHED)
-                        && mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
+                if (!(prevStatus == YarnApplicationState.FAILED || prevStatus == YarnApplicationState.KILLED
+                        || prevStatus == YarnApplicationState.FINISHED) && mode != Mode.DESTROY && mode != Mode.BACKUP
+                        && mode != Mode.RESTORE) {
                     throw new IllegalStateException("Instance is already running in: " + lockAppId);
                 } else if (mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
                     //stale lock file
@@ -598,7 +598,8 @@ public class AsterixYARNClient {
                     deleteLockFile();
                 }
             } catch (YarnException e) {
-                LOG.warn("Stale lockfile detected, but the RM has no record of this application's last run. This is normal if the cluster was restarted.");
+                LOG.warn(
+                        "Stale lockfile detected, but the RM has no record of this application's last run. This is normal if the cluster was restarted.");
                 deleteLockFile();
             }
         }
@@ -690,6 +691,7 @@ public class AsterixYARNClient {
 
     /**
      * Upload External libraries and functions to HDFS for an instance to use when started
+     * 
      * @throws IllegalStateException
      * @throws IOException
      */
@@ -700,8 +702,8 @@ public class AsterixYARNClient {
             throw new IllegalStateException("No instance by name " + instanceName + " found.");
         }
         if (isRunning()) {
-            throw new IllegalStateException("Instance " + instanceName
-                    + " is running. Please stop it before installing any libraries.");
+            throw new IllegalStateException(
+                    "Instance " + instanceName + " is running. Please stop it before installing any libraries.");
         }
         String libPathSuffix = CONF_DIR_REL + instanceFolder + "library" + Path.SEPARATOR + libDataverse
                 + Path.SEPARATOR;
@@ -714,6 +716,7 @@ public class AsterixYARNClient {
 
     /**
      * Finds the minimal classes and JARs needed to start the AM only.
+     * 
      * @return Resources the AM needs to start on the initial container.
      * @throws IllegalStateException
      * @throws IOException
@@ -771,7 +774,9 @@ public class AsterixYARNClient {
 
     /**
      * Uploads a AsterixDB cluster configuration to HDFS for the AM to use.
-     * @param overwrite Overwrite existing configurations by the same name.
+     *
+     * @param overwrite
+     *            Overwrite existing configurations by the same name.
      * @throws IllegalStateException
      * @throws IOException
      */
@@ -791,6 +796,7 @@ public class AsterixYARNClient {
 
     /**
      * Uploads binary resources to HDFS for use by the AM
+     *
      * @return
      * @throws IOException
      * @throws YarnException
@@ -862,7 +868,7 @@ public class AsterixYARNClient {
 
     /**
      * Submits the request to start the AsterixApplicationMaster to the YARN ResourceManager.
-     * 
+     *
      * @param app
      *            The application attempt handle.
      * @param resources
@@ -954,14 +960,11 @@ public class AsterixYARNClient {
         }
         if (mode == Mode.DESTROY) {
             vargs.add("-obliterate");
-        }
-        else if (mode == Mode.BACKUP) {
+        } else if (mode == Mode.BACKUP) {
             vargs.add("-backup");
-        }
-        else if (mode == Mode.RESTORE) {
+        } else if (mode == Mode.RESTORE) {
             vargs.add("-restore " + snapName);
-        }
-        else if( mode == Mode.INSTALL){
+        } else if (mode == Mode.INSTALL) {
             vargs.add("-initial ");
         }
         if (refresh) {
@@ -1036,8 +1039,11 @@ public class AsterixYARNClient {
 
     /**
      * Asks YARN to kill a given application by appId
-     * @param appId The application to kill.
-     * @param yarnClient The YARN client object that is connected to the RM.
+     *
+     * @param appId
+     *            The application to kill.
+     * @param yarnClient
+     *            The YARN client object that is connected to the RM.
      * @throws YarnException
      * @throws IOException
      */
@@ -1064,11 +1070,11 @@ public class AsterixYARNClient {
 
     /**
      * Tries to stop a running AsterixDB instance gracefully.
+     *
      * @throws IOException
      * @throws YarnException
      */
-    private void stopInstanceIfRunning()
-            throws IOException, YarnException {
+    private void stopInstanceIfRunning() throws IOException, YarnException {
         FileSystem fs = FileSystem.get(conf);
         String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
         Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
@@ -1086,20 +1092,24 @@ public class AsterixYARNClient {
 
     /**
      * Start a YARN job to delete local AsterixDB resources of an extant instance
-     * @param app The Client connection
-     * @param resources AM resources
+     *
+     * @param app
+     *            The Client connection
+     * @param resources
+     *            AM resources
      * @throws IOException
      * @throws YarnException
      */
 
-    private void removeInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
-            YarnException {
+    private void removeInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources)
+            throws IOException, YarnException {
         FileSystem fs = FileSystem.get(conf);
         //if the instance is up, fix that
         stopInstanceIfRunning();
         //now try deleting all of the on-disk artifacts on the cluster
         ApplicationId deleter = deployAM(app, resources, Mode.DESTROY);
-        boolean delete_start = Utils.waitForApplication(deleter, yarnClient, "Waiting for deletion to start", ccRestPort);
+        boolean delete_start = Utils.waitForApplication(deleter, yarnClient, "Waiting for deletion to start",
+                ccRestPort);
         if (!delete_start) {
             if (force) {
                 fs.delete(new Path(CONF_DIR_REL + instanceFolder), true);
@@ -1121,19 +1131,20 @@ public class AsterixYARNClient {
 
     /**
      * Start a YARN job to copy all data-containing resources of an AsterixDB instance to HDFS
+     *
      * @param app
      * @param resources
      * @throws IOException
      * @throws YarnException
      */
 
-    private void backupInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
-            YarnException {
+    private void backupInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources)
+            throws IOException, YarnException {
         stopInstanceIfRunning();
         ApplicationId backerUpper = deployAM(app, resources, Mode.BACKUP);
         boolean backupStart;
-        backupStart = Utils.waitForApplication(backerUpper, yarnClient, "Waiting for backup " + backerUpper.toString()
-                + "to start", ccRestPort);
+        backupStart = Utils.waitForApplication(backerUpper, yarnClient,
+                "Waiting for backup " + backerUpper.toString() + "to start", ccRestPort);
         if (!backupStart) {
             LOG.fatal("Backup failed to start");
             throw new YarnException();
@@ -1149,17 +1160,19 @@ public class AsterixYARNClient {
 
     /**
      * Start a YARN job to copy a set of resources from backupInstance to restore the state of an extant AsterixDB instance
+     *
      * @param app
      * @param resources
      * @throws IOException
      * @throws YarnException
      */
 
-    private void restoreInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
-            YarnException {
+    private void restoreInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources)
+            throws IOException, YarnException {
         stopInstanceIfRunning();
         ApplicationId restorer = deployAM(app, resources, Mode.RESTORE);
-        boolean restoreStart = Utils.waitForApplication(restorer, yarnClient, "Waiting for restore to start", ccRestPort);
+        boolean restoreStart = Utils.waitForApplication(restorer, yarnClient, "Waiting for restore to start",
+                ccRestPort);
         if (!restoreStart) {
             LOG.fatal("Restore failed to start");
             throw new YarnException();
@@ -1174,7 +1187,7 @@ public class AsterixYARNClient {
 
     /**
      * Stops the instance and remove the lockfile to allow a restart.
-     * 
+     *
      * @throws IOException
      * @throws JAXBException
      * @throws YarnException
@@ -1194,7 +1207,7 @@ public class AsterixYARNClient {
         }
         try {
             String ccIp = Utils.getCCHostname(instanceName, conf);
-            Utils.sendShutdownCall(ccIp,ccRestPort);
+            Utils.sendShutdownCall(ccIp, ccRestPort);
         } catch (IOException e) {
             LOG.error("Error while trying to issue safe shutdown:", e);
         }
@@ -1207,7 +1220,7 @@ public class AsterixYARNClient {
                 AsterixYARNClient.killApplication(appId, yarnClient);
                 completed = true;
             } catch (YarnException e1) {
-                LOG.fatal("Could not stop nor kill instance gracefully.",e1);
+                LOG.fatal("Could not stop nor kill instance gracefully.", e1);
                 return;
             }
         }
@@ -1278,11 +1291,12 @@ public class AsterixYARNClient {
 
     /**
      * Locate the Asterix parameters file.
-      * @return
+     *
+     * @return
      * @throws FileNotFoundException
      * @throws IOException
      */
-    private AsterixConfiguration locateConfig() throws FileNotFoundException, IOException{
+    private AsterixConfiguration locateConfig() throws FileNotFoundException, IOException {
         AsterixConfiguration configuration;
         String configPathBase = MERGED_PARAMETERS_PATH;
         if (baseConfig != null) {
@@ -1299,14 +1313,14 @@ public class AsterixYARNClient {
     /**
      *
      */
-    private void readConfigParams(AsterixConfiguration configuration){
+    private void readConfigParams(AsterixConfiguration configuration) {
         //this is the "base" config that is inside the zip, we start here
         for (org.apache.asterix.common.configuration.Property property : configuration.getProperty()) {
             if (property.getName().equalsIgnoreCase(CC_JAVA_OPTS_KEY)) {
                 ccJavaOpts = property.getValue();
             } else if (property.getName().equalsIgnoreCase(NC_JAVA_OPTS_KEY)) {
                 ncJavaOpts = property.getValue();
-            } else if(property.getName().equalsIgnoreCase(CC_REST_PORT_KEY)){
+            } else if (property.getName().equalsIgnoreCase(CC_REST_PORT_KEY)) {
                 ccRestPort = Integer.parseInt(property.getValue());
             }
 
@@ -1315,6 +1329,7 @@ public class AsterixYARNClient {
 
     /**
      * Retrieves necessary information from the cluster configuration and splices it into the Asterix configuration parameters
+     *
      * @param cluster
      * @throws FileNotFoundException
      * @throws IOException
@@ -1332,14 +1347,20 @@ public class AsterixYARNClient {
         configuration.setVersion(version);
 
         configuration.setInstanceName(asterixInstanceName);
-        String storeDir = null;
         List<Store> stores = new ArrayList<Store>();
+        String storeDir = cluster.getStore().trim();
         for (Node node : cluster.getNode()) {
-            storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
-            stores.add(new Store(node.getId(), storeDir));
+            String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+            String[] nodeIdDevice = iodevices.split(",");
+            StringBuilder nodeStores = new StringBuilder();
+            for (int i = 0; i < nodeIdDevice.length; i++) {
+                nodeStores.append(nodeIdDevice[i] + File.separator + storeDir + ",");
+            }
+            //remove last comma
+            nodeStores.deleteCharAt(nodeStores.length() - 1);
+            stores.add(new Store(node.getId(), nodeStores.toString()));
         }
         configuration.setStore(stores);
-
         List<Coredump> coredump = new ArrayList<Coredump>();
         String coredumpDir = null;
         List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
@@ -1354,7 +1375,6 @@ public class AsterixYARNClient {
                     + File.separator));
         }
         configuration.setMetadataNode(metadataNodeId);
-
         configuration.setCoredump(coredump);
         configuration.setTransactionLogDir(txnLogDirs);
         FileOutputStream os = new FileOutputStream(MERGED_PARAMETERS_PATH);


[3/3] incubator-asterixdb git commit: Divide Cluster into Unique Partitions

Posted by mh...@apache.org.
Divide Cluster into Unique Partitions

The change includes the following:
- Fix passing NC stores to AsterixConfiguration.
- Unify storage direcotry name in the instance level rather than the node level.
- Divide the cluster into unique storage partitions based on the number of stores.
- Refactored FileSplits and moved out of AqlMetadataProvider.
- Make AsterixHyracksIntegrationUtil use the passed configuration file.
- Make File Splits pass relative index paths of partitions rather than absolute paths.
- Remove unused AqlCompiledMetadataDeclarations class.

Change-Id: I8c7fbca5113dd7ad569a46dfa2591addb5bf8655
Reviewed-on: https://asterix-gerrit.ics.uci.edu/564
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <bu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/1d5cf640
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/1d5cf640
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/1d5cf640

Branch: refs/heads/master
Commit: 1d5cf6403c5c8a2a0b5bf4c8f64b34ecbf1ccc30
Parents: e3e1373
Author: Murtadha Hubail <mh...@uci.edu>
Authored: Thu Dec 31 08:38:15 2015 -0800
Committer: Murtadha Hubail <hu...@gmail.com>
Committed: Thu Dec 31 17:12:53 2015 -0800

----------------------------------------------------------------------
 .../api/common/AsterixAppRuntimeContext.java    |   6 +-
 .../common/AsterixHyracksIntegrationUtil.java   |  67 +++--
 .../asterix/aql/translator/QueryTranslator.java |   1 -
 .../bootstrap/NCApplicationEntryPoint.java      |  39 +--
 .../resources/asterix-build-configuration.xml   |   4 +-
 .../asterix/test/metadata/MetadataTest.java     |   5 -
 .../asterix/test/runtime/ExecutionTest.java     |   4 -
 .../test/runtime/SqlppExecutionTest.java        |   5 +-
 .../common/cluster/ClusterPartition.java        |  77 ++++++
 .../config/AsterixMetadataProperties.java       |  15 +-
 .../config/AsterixPropertiesAccessor.java       |  38 ++-
 .../common/context/DatasetLifecycleManager.java |  42 +--
 ...erixLSMInsertDeleteOperatorNodePushable.java |   2 +-
 .../src/main/resources/schema/cluster.xsd       |   1 -
 .../src/main/resources/schema/yarn_cluster.xsd  |   3 -
 .../apache/asterix/test/aql/TestExecutor.java   |  52 ++--
 .../asterix/event/driver/EventDriver.java       |   2 +-
 .../asterix/event/management/EventUtil.java     |   2 +-
 .../event/service/AsterixEventServiceUtil.java  |  85 +++---
 .../asterix/event/util/PatternCreator.java      |  44 ++-
 .../adapter/factory/HDFSAdapterFactory.java     | 111 ++++----
 .../factory/HDFSIndexingAdapterFactory.java     |   6 +-
 ...alDatasetIndexesAbortOperatorDescriptor.java |   5 +-
 ...lDatasetIndexesCommitOperatorDescriptor.java |  10 +-
 ...DatasetIndexesRecoverOperatorDescriptor.java |   4 +-
 .../installer/command/ValidateCommand.java      |  22 +-
 .../asterix/installer/driver/InstallerUtil.java |  13 +-
 .../metadata/bootstrap/MetadataBootstrap.java   |  34 ++-
 .../AqlCompiledMetadataDeclarations.java        | 276 -------------------
 .../declared/AqlLogicalPlanAndMetadataImpl.java |  16 +-
 .../metadata/declared/AqlMetadataProvider.java  | 164 +----------
 .../utils/SplitsAndConstraintsUtil.java         | 173 ++++++++++++
 .../om/util/AsterixClusterProperties.java       | 112 ++++++--
 ...dexModificationOperationCallbackFactory.java |   4 +-
 ...dexModificationOperationCallbackFactory.java |   4 +-
 ...dexModificationOperationCallbackFactory.java |   4 +-
 ...dexModificationOperationCallbackFactory.java |   4 +-
 .../PersistentLocalResourceFactory.java         |   4 +-
 .../PersistentLocalResourceRepository.java      | 108 ++++----
 .../service/recovery/RecoveryManager.java       |  38 ++-
 .../asterix/aoya/AsterixApplicationMaster.java  |  25 +-
 .../apache/asterix/aoya/AsterixYARNClient.java  | 154 ++++++-----
 42 files changed, 844 insertions(+), 941 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 45c0598..4a8a323 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -187,7 +187,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
             String nodeId = ncApplicationContext.getNodeId();
 
             replicaResourcesManager = new ReplicaResourcesManager(ioManager.getIODevices(),
-                    metadataProperties.getStores().get(nodeId)[0], nodeId, replicationProperties.getReplicationStore());
+                    AsterixClusterProperties.INSTANCE.getStorageDirectoryName(), nodeId,
+                    replicationProperties.getReplicationStore());
 
             replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
                     txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -377,7 +378,6 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
 
     @Override
     public void initializeResourceIdFactory() throws HyracksDataException {
-        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext)
-                .createResourceIdFactory();
+        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 0145651..d7842e8 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,11 +19,10 @@
 package org.apache.asterix.api.common;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.EnumSet;
+import java.util.Set;
 
 import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
 import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
@@ -41,22 +40,18 @@ import org.apache.hyracks.control.nc.NodeControllerService;
 public class AsterixHyracksIntegrationUtil {
 
     private static final String IO_DIR_KEY = "java.io.tmpdir";
-    public static final int NODES = 2;
-    public static final int PARTITONS = 2;
-
     public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
-
     public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
 
     public static ClusterControllerService cc;
-    public static NodeControllerService[] ncs = new NodeControllerService[NODES];
+    public static NodeControllerService[] ncs;
     public static IHyracksClientConnection hcc;
 
-    protected static AsterixTransactionProperties txnProperties;
+    private static AsterixPropertiesAccessor propertiesAccessor;
 
     public static void init(boolean deleteOldInstanceData) throws Exception {
-        AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
-        txnProperties = new AsterixTransactionProperties(apa);
+        propertiesAccessor = new AsterixPropertiesAccessor();
+        ncs = new NodeControllerService[propertiesAccessor.getNodeNames().size()];
         if (deleteOldInstanceData) {
             deleteTransactionLogs();
             removeTestStorageFiles();
@@ -77,7 +72,8 @@ public class AsterixHyracksIntegrationUtil {
 
         // Starts ncs.
         int n = 0;
-        for (String ncName : getNcNames()) {
+        Set<String> nodes = propertiesAccessor.getNodeNames();
+        for (String ncName : nodes) {
             NCConfig ncConfig1 = new NCConfig();
             ncConfig1.ccHost = "localhost";
             ncConfig1.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
@@ -87,13 +83,28 @@ public class AsterixHyracksIntegrationUtil {
             ncConfig1.nodeId = ncName;
             ncConfig1.resultTTL = 30000;
             ncConfig1.resultSweepThreshold = 1000;
-            for (int p = 0; p < PARTITONS; ++p) {
+            String tempPath = System.getProperty(IO_DIR_KEY);
+            if (tempPath.endsWith(File.separator)) {
+                tempPath = tempPath.substring(0, tempPath.length() - 1);
+            }
+            //get initial partitions from properties
+            String[] nodeStores = propertiesAccessor.getStores().get(ncName);
+            if (nodeStores == null) {
+                throw new Exception("Coudn't find stores for NC: " + ncName);
+            }
+            String tempDirPath = System.getProperty(IO_DIR_KEY);
+            if (!tempDirPath.endsWith(File.separator)) {
+                tempDirPath += File.separator;
+            }
+            for (int p = 0; p < nodeStores.length; p++) {
+                //create IO devices based on stores
+                String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
+                File ioDeviceDir = new File(iodevicePath);
+                ioDeviceDir.mkdirs();
                 if (p == 0) {
-                    ncConfig1.ioDevices = System.getProperty("java.io.tmpdir") + File.separator + ncConfig1.nodeId
-                            + "/iodevice" + p;
+                    ncConfig1.ioDevices = iodevicePath;
                 } else {
-                    ncConfig1.ioDevices += "," + System.getProperty("java.io.tmpdir") + File.separator
-                            + ncConfig1.nodeId + "/iodevice" + p;
+                    ncConfig1.ioDevices += "," + iodevicePath;
                 }
             }
             ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
@@ -105,19 +116,7 @@ public class AsterixHyracksIntegrationUtil {
     }
 
     public static String[] getNcNames() {
-        String[] names = new String[NODES];
-        for (int n = 0; n < NODES; ++n) {
-            names[n] = "asterix_nc" + (n + 1);
-        }
-        return names;
-    }
-
-    public static String[] getDataDirs() {
-        String[] names = new String[NODES];
-        for (int n = 0; n < NODES; ++n) {
-            names[n] = "asterix_nc" + (n + 1) + "data";
-        }
-        return names;
+        return propertiesAccessor.getNodeNames().toArray(new String[propertiesAccessor.getNodeNames().size()]);
     }
 
     public static IHyracksClientConnection getHyracksClientConnection() {
@@ -147,17 +146,17 @@ public class AsterixHyracksIntegrationUtil {
         hcc.waitForCompletion(jobId);
     }
 
-    private static void removeTestStorageFiles() throws IOException {
+    public static void removeTestStorageFiles() {
         File dir = new File(System.getProperty(IO_DIR_KEY));
-        for (String ncName : AsterixHyracksIntegrationUtil.getNcNames()) {
+        for (String ncName : propertiesAccessor.getNodeNames()) {
             File ncDir = new File(dir, ncName);
             FileUtils.deleteQuietly(ncDir);
         }
     }
 
     private static void deleteTransactionLogs() throws Exception {
-        for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
-            File log = new File(txnProperties.getLogDirectory(ncId));
+        for (String ncId : propertiesAccessor.getNodeNames()) {
+            File log = new File(propertiesAccessor.getTransactionLogDirs().get(ncId));
             if (log.exists()) {
                 FileUtils.deleteDirectory(log);
             }
@@ -185,7 +184,7 @@ public class AsterixHyracksIntegrationUtil {
         try {
             System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml");
 
-            init(false);
+            init(true);
             while (true) {
                 Thread.sleep(10000);
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 714e05c..08b92e7 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -1246,7 +1246,6 @@ public class QueryTranslator extends AbstractLangTranslator {
                 }
             }
             jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
-
             //#. mark PendingDropOp on the dataverse record by
             //   first, deleting the dataverse record from the DATAVERSE_DATASET
             //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 147a356..496c2f8 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -46,11 +46,11 @@ import org.apache.asterix.metadata.MetadataNode;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.replication.storage.AsterixFilesUtil;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.INCApplicationEntryPoint;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
@@ -202,7 +202,6 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
 
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
-
         if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
@@ -213,7 +212,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
 
             PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
                     .getLocalResourceRepository();
-            localResourceRepository.initializeNewUniverse(metadataProperties.getStores().get(nodeId)[0]);
+            localResourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
         IAsterixStateProxy proxy = null;
@@ -277,22 +276,18 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         performLocalCleanUp();
     }
 
-    private void performLocalCleanUp() throws IOException {
+    private void performLocalCleanUp() {
         //delete working area files from failed jobs
         runtimeContext.getIOManager().deleteWorkspaceFiles();
 
         //reclaim storage for temporary datasets.
-        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
-                .getLocalResourceRepository();
-
-        String[] storageMountingPoints = localResourceRepository.getStorageMountingPoints();
-        String storageFolderName = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties().getStores()
-                .get(nodeId)[0];
-
-        for (String mountPoint : storageMountingPoints) {
-            String tempDatasetFolder = mountPoint + storageFolderName + File.separator
-                    + AqlMetadataProvider.TEMP_DATASETS_STORAGE_FOLDER;
-            AsterixFilesUtil.deleteFolder(tempDatasetFolder);
+        //get node stores
+        String[] nodeStores = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties().getStores()
+                .get(nodeId);
+        for (String store : nodeStores) {
+            String tempDatasetFolder = store + File.separator
+                    + SplitsAndConstraintsUtil.TEMP_DATASETS_STORAGE_FOLDER;
+            FileUtils.deleteQuietly(new File(tempDatasetFolder));
         }
 
         // TODO
@@ -309,7 +304,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
             if (cluster == null) {
                 throw new IllegalStateException("No cluster configuration found for this instance");
             }
-            String asterixInstanceName = cluster.getInstanceName();
+            String asterixInstanceName = metadataProperties.getInstanceName();
             AsterixTransactionProperties txnProperties = ((IAsterixPropertiesProvider) runtimeContext)
                     .getTransactionProperties();
             Node self = null;
@@ -322,8 +317,14 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
             for (Node node : nodes) {
                 String ncId = asterixInstanceName + "_" + node.getId();
                 if (ncId.equalsIgnoreCase(nodeId)) {
-                    String storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
-                    metadataProperties.getStores().put(nodeId, storeDir.split(","));
+                    String storeDir = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+                    String nodeIoDevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+                    String[] ioDevicePaths = nodeIoDevices.trim().split(",");
+                    for (int i = 0; i < ioDevicePaths.length; i++) {
+                        //construct full store path
+                        ioDevicePaths[i] += File.separator + storeDir;
+                    }
+                    metadataProperties.getStores().put(nodeId, ioDevicePaths);
 
                     String coredumpPath = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
                     metadataProperties.getCoredumpPaths().put(nodeId, coredumpPath);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/resources/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index fa20099..731113b 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -20,11 +20,11 @@
 	<metadataNode>asterix_nc1</metadataNode>
 	<store>
 		<ncId>asterix_nc1</ncId>
-		<storeDirs>asterix_nc1data</storeDirs>
+		<storeDirs>iodevice0,iodevice1</storeDirs>
 	</store>
 	<store>
 		<ncId>asterix_nc2</ncId>
-		<storeDirs>asterix_nc2data</storeDirs>
+		<storeDirs>iodevice0,iodevice1</storeDirs>
 	</store>
 	<transactionLogDir>
 		<ncId>asterix_nc1</ncId>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
index 376b2ff..6c6e411 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
@@ -65,11 +65,6 @@ public class MetadataTest {
         if (files == null || files.length == 0) {
             outdir.delete();
         }
-
-        // clean up the files written by the ASTERIX storage manager
-        for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
-            testExecutor.deleteRec(new File(d));
-        }
     }
 
     @Parameters

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 052e0be..7a55c90 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@ -68,10 +68,6 @@ public class ExecutionTest {
     @AfterClass
     public static void tearDown() throws Exception {
         ExecutionTestUtil.tearDown();
-        // clean up the files written by the ASTERIX storage manager
-        for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
-            testExecutor.deleteRec(new File(d));
-        }
     }
 
     @Parameters

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
index d5f4db3..22a3ad7 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
@@ -65,10 +65,7 @@ public class SqlppExecutionTest {
     @AfterClass
     public static void tearDown() throws Exception {
         ExecutionTestUtil.tearDown();
-        // clean up the files written by the ASTERIX storage manager
-        for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
-            testExecutor.deleteRec(new File(d));
-        }
+        AsterixHyracksIntegrationUtil.removeTestStorageFiles();
     }
 
     @Parameters

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
new file mode 100644
index 0000000..6cd44a7
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+public class ClusterPartition implements Cloneable {
+    private final int partitionId;
+    private final String nodeId;
+    private final int ioDeviceNum;
+    private String activeNodeId = null;
+    private boolean active = false;
+
+    public ClusterPartition(int partitionId, String nodeId, int ioDeviceNum) {
+        this.partitionId = partitionId;
+        this.nodeId = nodeId;
+        this.ioDeviceNum = ioDeviceNum;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public int getIODeviceNum() {
+        return ioDeviceNum;
+    }
+
+    public String getActiveNodeId() {
+        return activeNodeId;
+    }
+
+    public void setActiveNodeId(String activeNodeId) {
+        this.activeNodeId = activeNodeId;
+    }
+
+    public void setActive(boolean active) {
+        this.active = active;
+    }
+
+    @Override
+    public ClusterPartition clone() {
+        ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        return clone;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ID:" + partitionId);
+        sb.append(" Original Node: " + nodeId);
+        sb.append(" IODevice: " + ioDeviceNum);
+        sb.append(" Active Node: " + activeNodeId);
+        return sb.toString();
+    }
+
+    public boolean isActive() {
+        return active;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index adaca46..8e2c4e7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -20,6 +20,9 @@ package org.apache.asterix.common.config;
 
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
 
 public class AsterixMetadataProperties extends AbstractAsterixProperties {
 
@@ -35,8 +38,8 @@ public class AsterixMetadataProperties extends AbstractAsterixProperties {
         return accessor.getMetadataNodeName();
     }
 
-    public String getMetadataStore() {
-        return accessor.getMetadataStore();
+    public ClusterPartition getMetadataPartition() {
+        return accessor.getMetadataPartiton();
     }
 
     public Map<String, String[]> getStores() {
@@ -54,4 +57,12 @@ public class AsterixMetadataProperties extends AbstractAsterixProperties {
     public Map<String, String> getCoredumpPaths() {
         return accessor.getCoredumpConfig();
     }
+
+    public Map<String, ClusterPartition[]> getNodePartitions() {
+        return accessor.getNodePartitions();
+    }
+
+    public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
+        return accessor.getClusterPartitions();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index d6c81ab..cc7ec84 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -35,6 +37,7 @@ import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Coredump;
 import org.apache.asterix.common.configuration.Property;
@@ -53,12 +56,15 @@ public class AsterixPropertiesAccessor {
     private final Map<String, Property> asterixConfigurationParams;
     private final Map<String, String> transactionLogDirs;
     private final Map<String, String> asterixBuildProperties;
+    private final Map<String, ClusterPartition[]> nodePartitionsMap;
+    private SortedMap<Integer, ClusterPartition> clusterPartitions;
 
     public AsterixPropertiesAccessor() throws AsterixException {
         String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
         if (fileName == null) {
             fileName = GlobalConfig.DEFAULT_CONFIG_FILE_NAME;
         }
+
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(fileName);
         if (is == null) {
             try {
@@ -82,9 +88,20 @@ public class AsterixPropertiesAccessor {
         stores = new HashMap<String, String[]>();
         List<Store> configuredStores = asterixConfiguration.getStore();
         nodeNames = new HashSet<String>();
+        nodePartitionsMap = new HashMap<>();
+        clusterPartitions = new TreeMap<>();
+        int uniquePartitionId = 0;
         for (Store store : configuredStores) {
             String trimmedStoreDirs = store.getStoreDirs().trim();
-            stores.put(store.getNcId(), trimmedStoreDirs.split(","));
+            String[] nodeStores = trimmedStoreDirs.split(",");
+            ClusterPartition[] nodePartitions = new ClusterPartition[nodeStores.length];
+            for (int i = 0; i < nodePartitions.length; i++) {
+                ClusterPartition partition = new ClusterPartition(uniquePartitionId++, store.getNcId(), i);
+                clusterPartitions.put(partition.getPartitionId(), partition);
+                nodePartitions[i] = partition;
+            }
+            stores.put(store.getNcId(), nodeStores);
+            nodePartitionsMap.put(store.getNcId(), nodePartitions);
             nodeNames.add(store.getNcId());
         }
         asterixConfigurationParams = new HashMap<String, Property>();
@@ -116,10 +133,6 @@ public class AsterixPropertiesAccessor {
         return metadataNodeName;
     }
 
-    public String getMetadataStore() {
-        return stores.get(metadataNodeName)[0];
-    }
-
     public Map<String, String[]> getStores() {
         return stores;
     }
@@ -172,7 +185,7 @@ public class AsterixPropertiesAccessor {
         }
     }
 
-    private <T> void logConfigurationError(Property p, T defaultValue) {
+    private static <T> void logConfigurationError(Property p, T defaultValue) {
         if (LOGGER.isLoggable(Level.SEVERE)) {
             LOGGER.severe("Invalid property value '" + p.getValue() + "' for property '" + p.getName()
                     + "'.\n See the description: \n" + p.getDescription() + "\nDefault = " + defaultValue);
@@ -182,4 +195,17 @@ public class AsterixPropertiesAccessor {
     public String getInstanceName() {
         return instanceName;
     }
+
+    public ClusterPartition getMetadataPartiton() {
+        //metadata partition is always the first partition on the metadata node
+        return nodePartitionsMap.get(metadataNodeName)[0];
+    }
+
+    public Map<String, ClusterPartition[]> getNodePartitions() {
+        return nodePartitionsMap;
+    }
+
+    public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
+        return clusterPartitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index adf1152..5062d06 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -78,9 +78,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized IIndex getIndex(String resourceName) throws HyracksDataException {
-        int datasetID = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized IIndex getIndex(String resourcePath) throws HyracksDataException {
+        int datasetID = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
         return getIndex(datasetID, resourceID);
     }
 
@@ -98,9 +98,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized void register(String resourceName, IIndex index) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null) {
             dsInfo = getDatasetInfo(did);
@@ -116,16 +116,16 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID, resourceID));
     }
 
-    public int getDIDfromResourceName(String resourceName) throws HyracksDataException {
-        LocalResource lr = resourceRepository.getResourceByName(resourceName);
+    public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
+        LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
         if (lr == null) {
             return -1;
         }
         return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
     }
 
-    public long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
-        LocalResource lr = resourceRepository.getResourceByName(resourceName);
+    public long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
+        LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
         if (lr == null) {
             return -1;
         }
@@ -133,9 +133,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized void unregister(String resourceName) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void unregister(String resourcePath) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
 
         DatasetInfo dsInfo = datasetInfos.get(did);
         IndexInfo iInfo = dsInfo.indexes.get(resourceID);
@@ -180,9 +180,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized void open(String resourceName) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void open(String resourcePath) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
 
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null || !dsInfo.isRegistered) {
@@ -262,9 +262,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized void close(String resourceName) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void close(String resourcePath) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
 
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null) {
@@ -704,9 +704,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized void allocateMemory(String resourceName) throws HyracksDataException {
+    public synchronized void allocateMemory(String resourcePath) throws HyracksDataException {
         //a resource name in the case of DatasetLifecycleManager is a dataset id which is passed to the ResourceHeapBufferAllocator.
-        int did = Integer.parseInt(resourceName);
+        int did = Integer.parseInt(resourcePath);
         allocateDatasetMemory(did);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 51168ae..fd1ebb8 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -63,7 +63,7 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
         try {
             writer.open();
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
-                    indexHelper.getResourceName(), indexHelper.getResourceID(), lsmIndex, ctx);
+                    indexHelper.getResourcePath(), indexHelper.getResourceID(), lsmIndex, ctx);
             indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
             ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
             if (tupleFilterFactory != null) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/resources/schema/cluster.xsd
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index d3203d5..872c959 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -113,7 +113,6 @@
 				<xs:element ref="cl:java_home" minOccurs="0" />
 				<xs:element ref="cl:log_dir" minOccurs="0" />
 				<xs:element ref="cl:txn_log_dir" minOccurs="0" />
-				<xs:element ref="cl:store" minOccurs="0" />
 				<xs:element ref="cl:iodevices" minOccurs="0" />
 				<xs:element ref="cl:debug_port" minOccurs="0" />
 			</xs:sequence>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/resources/schema/yarn_cluster.xsd
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/resources/schema/yarn_cluster.xsd b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
index f54cf90..8827985 100644
--- a/asterix-common/src/main/resources/schema/yarn_cluster.xsd
+++ b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
@@ -138,9 +138,6 @@
                     ref="cl:txn_log_dir"
                     minOccurs="0" />
                 <xs:element
-                    ref="cl:store"
-                    minOccurs="0" />
-                <xs:element
                     ref="cl:iodevices"
                     minOccurs="0" />
                 <xs:element

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index d04a3dd..d8147b6 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -71,7 +71,7 @@ public class TestExecutor {
         this.port = 19002;
     }
 
-    public TestExecutor(String host, int port){
+    public TestExecutor(String host, int port) {
         this.host = host;
         this.port = port;
     }
@@ -225,12 +225,16 @@ public class TestExecutor {
             // In future this may be changed depending on the requested
             // output format sent to the servlet.
             String errorBody = method.getResponseBodyAsString();
-            JSONObject result = new JSONObject(errorBody);
-            String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
-                    result.getString("stacktrace") };
-            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
-            throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
-                    + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+            try {
+                JSONObject result = new JSONObject(errorBody);
+                String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
+                        result.getString("stacktrace") };
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
+                throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
+                        + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+            } catch (Exception e) {
+                throw new Exception(errorBody);
+            }
         }
         return statusCode;
     }
@@ -303,7 +307,7 @@ public class TestExecutor {
     }
 
     private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
-        final String url = "http://"+host+":"+port+"/query/result";
+        final String url = "http://" + host + ":" + port + "/query/result";
 
         // Create a method instance.
         GetMethod method = new GetMethod(url);
@@ -430,9 +434,9 @@ public class TestExecutor {
                     switch (ctx.getType()) {
                         case "ddl":
                             if (ctx.getFile().getName().endsWith("aql")) {
-                                executeDDL(statement, "http://"+host+":"+port+"/ddl");
+                                executeDDL(statement, "http://" + host + ":" + port + "/ddl");
                             } else {
-                                executeDDL(statement, "http://"+host+":"+port+"/ddl/sqlpp");
+                                executeDDL(statement, "http://" + host + ":" + port + "/ddl/sqlpp");
                             }
                             break;
                         case "update":
@@ -442,9 +446,9 @@ public class TestExecutor {
                                         "127.0.0.1://../../../../../../asterix-app/");
                             }
                             if (ctx.getFile().getName().endsWith("aql")) {
-                                executeUpdate(statement, "http://"+host+":"+port+"/update");
+                                executeUpdate(statement, "http://" + host + ":" + port + "/update");
                             } else {
-                                executeUpdate(statement, "http://"+host+":"+port+"/update/sqlpp");
+                                executeUpdate(statement, "http://" + host + ":" + port + "/update/sqlpp");
                             }
                             break;
                         case "query":
@@ -461,25 +465,25 @@ public class TestExecutor {
                             OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
                             if (ctx.getFile().getName().endsWith("aql")) {
                                 if (ctx.getType().equalsIgnoreCase("query")) {
-                                    resultStream = executeQuery(statement, fmt, "http://"+host+":"+port+"/query",
-                                            cUnit.getParameter());
+                                    resultStream = executeQuery(statement, fmt,
+                                            "http://" + host + ":" + port + "/query", cUnit.getParameter());
                                 } else if (ctx.getType().equalsIgnoreCase("async")) {
                                     resultStream = executeAnyAQLAsync(statement, false, fmt,
-                                            "http://"+host+":"+port+"/aql");
+                                            "http://" + host + ":" + port + "/aql");
                                 } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
                                     resultStream = executeAnyAQLAsync(statement, true, fmt,
-                                            "http://"+host+":"+port+"/aql");
+                                            "http://" + host + ":" + port + "/aql");
                                 }
                             } else {
                                 if (ctx.getType().equalsIgnoreCase("query")) {
-                                    resultStream = executeQuery(statement, fmt, "http://"+host+":"+port+"/query/sqlpp",
-                                            cUnit.getParameter());
+                                    resultStream = executeQuery(statement, fmt,
+                                            "http://" + host + ":" + port + "/query/sqlpp", cUnit.getParameter());
                                 } else if (ctx.getType().equalsIgnoreCase("async")) {
                                     resultStream = executeAnyAQLAsync(statement, false, fmt,
-                                            "http://"+host+":"+port+"/sqlpp");
+                                            "http://" + host + ":" + port + "/sqlpp");
                                 } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
                                     resultStream = executeAnyAQLAsync(statement, true, fmt,
-                                            "http://"+host+":"+port+"/sqlpp");
+                                            "http://" + host + ":" + port + "/sqlpp");
                                 }
                             }
 
@@ -506,7 +510,7 @@ public class TestExecutor {
                             break;
                         case "txnqbc": //qbc represents query before crash
                             resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
-                                    "http://"+host+":"+port+"/query", cUnit.getParameter());
+                                    "http://" + host + ":" + port + "/query", cUnit.getParameter());
                             qbcFile = new File(actualPath + File.separator
                                     + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
                                     + cUnit.getName() + "_qbc.adm");
@@ -515,7 +519,7 @@ public class TestExecutor {
                             break;
                         case "txnqar": //qar represents query after recovery
                             resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
-                                    "http://"+host+":"+port+"/query", cUnit.getParameter());
+                                    "http://" + host + ":" + port + "/query", cUnit.getParameter());
                             qarFile = new File(actualPath + File.separator
                                     + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
                                     + cUnit.getName() + "_qar.adm");
@@ -528,7 +532,7 @@ public class TestExecutor {
                             break;
                         case "txneu": //eu represents erroneous update
                             try {
-                                executeUpdate(statement, "http://"+host+":"+port+"/update");
+                                executeUpdate(statement, "http://" + host + ":" + port + "/update");
                             } catch (Exception e) {
                                 //An exception is expected.
                                 failed = true;
@@ -556,7 +560,7 @@ public class TestExecutor {
                             break;
                         case "errddl": // a ddlquery that expects error
                             try {
-                                executeDDL(statement, "http://"+host+":"+port+"/ddl");
+                                executeDDL(statement, "http://" + host + ":" + port + "/ddl");
                             } catch (Exception e) {
                                 // expected error happens
                                 failed = true;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index c92262c..29765fd 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@ import org.kohsuke.args4j.CmdLineParser;
 public class EventDriver {
 
     public static final String CLIENT_NODE_ID = "client_node";
-    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
+    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
 
     private static String eventsDir;
     private static Events events;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index d6e7da0..b83faa2 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -191,7 +191,7 @@ public class EventUtil {
             String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
                     .getMasterNode().getJavaHome();
             return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
-                    null, null, null, cluster.getMasterNode().getDebugPort());
+                    null, null, cluster.getMasterNode().getDebugPort());
         }
 
         List<Node> nodeList = cluster.getNode();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index 33ba787..4bd5098 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -45,7 +45,6 @@ import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Coredump;
 import org.apache.asterix.common.configuration.Store;
@@ -59,6 +58,7 @@ import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Env;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.event.schema.cluster.Property;
+import org.apache.commons.io.IOUtils;
 
 public class AsterixEventServiceUtil {
 
@@ -90,8 +90,8 @@ public class AsterixEventServiceUtil {
         return instance;
     }
 
-    public static void createAsterixZip(AsterixInstance asterixInstance) throws IOException, InterruptedException,
-            JAXBException, EventException {
+    public static void createAsterixZip(AsterixInstance asterixInstance)
+            throws IOException, InterruptedException, JAXBException, EventException {
         String asterixInstanceDir = asterixInstanceDir(asterixInstance);
         unzip(AsterixEventService.getAsterixZip(), asterixInstanceDir);
 
@@ -128,18 +128,18 @@ public class AsterixEventServiceUtil {
 
         clusterProperties.add(new Property(EventUtil.CC_JAVA_OPTS, ccJavaOpts));
         clusterProperties.add(new Property(EventUtil.NC_JAVA_OPTS, ncJavaOpts));
-        clusterProperties.add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator
-                + "asterix"));
+        clusterProperties
+                .add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator + "asterix"));
         clusterProperties.add(new Property("LOG_DIR", cluster.getLogDir()));
         clusterProperties.add(new Property("JAVA_HOME", cluster.getJavaHome()));
         clusterProperties.add(new Property("WORKING_DIR", cluster.getWorkingDir().getDir()));
         clusterProperties.add(new Property("CLIENT_NET_IP", cluster.getMasterNode().getClientIp()));
         clusterProperties.add(new Property("CLUSTER_NET_IP", cluster.getMasterNode().getClusterIp()));
 
-        int clusterNetPort = cluster.getMasterNode().getClusterPort() != null ? cluster.getMasterNode()
-                .getClusterPort().intValue() : CLUSTER_NET_PORT_DEFAULT;
-        int clientNetPort = cluster.getMasterNode().getClientPort() != null ? cluster.getMasterNode().getClientPort()
-                .intValue() : CLIENT_NET_PORT_DEFAULT;
+        int clusterNetPort = cluster.getMasterNode().getClusterPort() != null
+                ? cluster.getMasterNode().getClusterPort().intValue() : CLUSTER_NET_PORT_DEFAULT;
+        int clientNetPort = cluster.getMasterNode().getClientPort() != null
+                ? cluster.getMasterNode().getClientPort().intValue() : CLIENT_NET_PORT_DEFAULT;
         int httpPort = cluster.getMasterNode().getHttpPort() != null ? cluster.getMasterNode().getHttpPort().intValue()
                 : HTTP_PORT_DEFAULT;
 
@@ -151,8 +151,8 @@ public class AsterixEventServiceUtil {
     }
 
     private static String asterixZipName() {
-        return AsterixEventService.getAsterixZip().substring(
-                AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
+        return AsterixEventService.getAsterixZip()
+                .substring(AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
     }
 
     private static String asterixJarPath(AsterixInstance asterixInstance, String asterixInstanceDir) {
@@ -174,8 +174,8 @@ public class AsterixEventServiceUtil {
         new File(asterixInstanceDir + File.separator + ASTERIX_CONFIGURATION_FILE).delete();
     }
 
-    private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir, AsterixInstance asterixInstance)
-            throws IOException, EventException, JAXBException {
+    private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir,
+            AsterixInstance asterixInstance) throws IOException, EventException, JAXBException {
         File sourceJar = new File(asterixJarPath(asterixInstance, asterixInstanceDir));
         writeAsterixClusterConfigurationFile(asterixInstance);
 
@@ -185,8 +185,8 @@ public class AsterixEventServiceUtil {
         new File(asterixInstanceDir + File.separator + CLUSTER_CONFIGURATION_FILE).delete();
     }
 
-    private static void writeAsterixClusterConfigurationFile(AsterixInstance asterixInstance) throws IOException,
-            EventException, JAXBException {
+    private static void writeAsterixClusterConfigurationFile(AsterixInstance asterixInstance)
+            throws IOException, EventException, JAXBException {
         String asterixInstanceName = asterixInstance.getName();
         Cluster cluster = asterixInstance.getCluster();
 
@@ -197,8 +197,8 @@ public class AsterixEventServiceUtil {
                 + asterixInstanceName + File.separator + "cluster.xml"));
     }
 
-    public static void addLibraryToAsterixZip(AsterixInstance asterixInstance, String dataverseName,
-            String libraryName, String libraryPath) throws IOException {
+    public static void addLibraryToAsterixZip(AsterixInstance asterixInstance, String dataverseName, String libraryName,
+            String libraryPath) throws IOException {
         File instanceDir = new File(asterixInstanceDir(asterixInstance));
         if (!instanceDir.exists()) {
             instanceDir.mkdirs();
@@ -235,30 +235,8 @@ public class AsterixEventServiceUtil {
         return metadataNode;
     }
 
-    public static String getNodeDirectories(String asterixInstanceName, Node node, Cluster cluster) {
-        String storeDataSubDir = asterixInstanceName + File.separator + "data" + File.separator;
-        String[] storeDirs = null;
-        StringBuffer nodeDataStore = new StringBuffer();
-        String storeDirValue = node.getStore();
-        if (storeDirValue == null) {
-            storeDirValue = cluster.getStore();
-            if (storeDirValue == null) {
-                throw new IllegalStateException(" Store not defined for node " + node.getId());
-            }
-            storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
-        }
-
-        storeDirs = storeDirValue.split(",");
-        for (String ns : storeDirs) {
-            nodeDataStore.append(ns + File.separator + storeDataSubDir.trim());
-            nodeDataStore.append(",");
-        }
-        nodeDataStore.deleteCharAt(nodeDataStore.length() - 1);
-        return nodeDataStore.toString();
-    }
-
-    private static void writeAsterixConfigurationFile(AsterixInstance asterixInstance) throws IOException,
-            JAXBException {
+    private static void writeAsterixConfigurationFile(AsterixInstance asterixInstance)
+            throws IOException, JAXBException {
         String asterixInstanceName = asterixInstance.getName();
         Cluster cluster = asterixInstance.getCluster();
         String metadataNodeId = asterixInstance.getMetadataNodeId();
@@ -266,29 +244,34 @@ public class AsterixEventServiceUtil {
         AsterixConfiguration configuration = asterixInstance.getAsterixConfiguration();
         configuration.setInstanceName(asterixInstanceName);
         configuration.setMetadataNode(asterixInstanceName + "_" + metadataNodeId);
-        String storeDir = null;
         List<Store> stores = new ArrayList<Store>();
+        String storeDir = cluster.getStore().trim();
         for (Node node : cluster.getNode()) {
-            storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
-            stores.add(new Store(asterixInstanceName + "_" + node.getId(), storeDir));
+            String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+            String[] nodeIdDevice = iodevices.split(",");
+            StringBuilder nodeStores = new StringBuilder();
+            for (int i = 0; i < nodeIdDevice.length; i++) {
+                nodeStores.append(nodeIdDevice[i] + File.separator + storeDir + ",");
+            }
+            //remove last comma
+            nodeStores.deleteCharAt(nodeStores.length() - 1);
+            stores.add(new Store(asterixInstanceName + "_" + node.getId(), nodeStores.toString()));
         }
         configuration.setStore(stores);
-
         List<Coredump> coredump = new ArrayList<Coredump>();
         String coredumpDir = null;
         List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
         String txnLogDir = null;
         for (Node node : cluster.getNode()) {
             coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
-            coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(), coredumpDir + File.separator
-                    + asterixInstanceName + "_" + node.getId()));
+            coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(),
+                    coredumpDir + File.separator + asterixInstanceName + "_" + node.getId()));
 
             txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir();
             txnLogDirs.add(new TransactionLogDir(asterixInstanceName + "_" + node.getId(), txnLogDir));
         }
         configuration.setCoredump(coredump);
         configuration.setTransactionLogDir(txnLogDirs);
-
         File asterixConfDir = new File(AsterixEventService.getAsterixDir() + File.separator + asterixInstanceName);
         asterixConfDir.mkdirs();
 
@@ -300,8 +283,6 @@ public class AsterixEventServiceUtil {
         os.close();
     }
 
-
-
     public static void unzip(String sourceFile, String destDir) throws IOException {
         BufferedOutputStream dest = null;
         FileInputStream fis = new FileInputStream(sourceFile);
@@ -432,8 +413,8 @@ public class AsterixEventServiceUtil {
             }
         }
         if (!valid) {
-            throw new EventException("Asterix instance by the name " + name + " is in " + instance.getState()
-                    + " state ");
+            throw new EventException(
+                    "Asterix instance by the name " + name + " is in " + instance.getState() + " state ");
         }
         return instance;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index b6aaddb..6085019 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -164,11 +164,11 @@ public class PatternCreator {
         String store;
         String pargs;
         String iodevices;
+        store = cluster.getStore();
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
-            store = node.getStore() == null ? cluster.getStore() : node.getStore();
             pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
                     + backupId + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
@@ -188,12 +188,12 @@ public class PatternCreator {
         String txnLogDir;
         String store;
         String pargs;
+        store = cluster.getStore();
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
             txnLogDir = node.getTxnLogDir() == null ? instance.getCluster().getTxnLogDir() : node.getTxnLogDir();
-            store = node.getStore() == null ? cluster.getStore() : node.getStore();
             pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + txnLogDir + " " + backupId + " " + backupDir
                     + " " + "local" + " " + node.getId();
@@ -212,14 +212,12 @@ public class PatternCreator {
         VerificationUtil.verifyBackupRestoreConfiguration(hdfsUrl, hadoopVersion, hdfsBackupDir);
         String workingDir = cluster.getWorkingDir().getDir();
         int backupId = backupInfo.getId();
-        String nodeStore;
         String pargs;
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            nodeStore = node.getStore() == null ? clusterStore : node.getStore();
-            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + nodeStore + " "
+            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
                     + backupId + " " + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
                     + hadoopVersion;
@@ -235,14 +233,12 @@ public class PatternCreator {
         String backupDir = backupInfo.getBackupConf().getBackupDir();
         String workingDir = cluster.getWorkingDir().getDir();
         int backupId = backupInfo.getId();
-        String nodeStore;
         String pargs;
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            nodeStore = node.getStore() == null ? clusterStore : node.getStore();
-            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + nodeStore + " "
+            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
                     + backupId + " " + backupDir + " " + "local" + " " + node.getId();
             Event event = new Event("restore", nodeid, pargs);
@@ -262,8 +258,8 @@ public class PatternCreator {
 
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
         String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
-        String pargs = username + " " + hadoopDir.getAbsolutePath() + " " + cluster.getMasterNode().getClusterIp()
-                + " " + workingDir;
+        String pargs = username + " " + hadoopDir.getAbsolutePath() + " " + cluster.getMasterNode().getClusterIp() + " "
+                + workingDir;
         Event event = new Event("directory_transfer", nodeid, pargs);
         Pattern p = new Pattern(null, 1, null, event);
         addInitialDelay(p, 2, "sec");
@@ -428,8 +424,8 @@ public class PatternCreator {
                 patternList.add(p);
             }
 
-            pargs = username + " " + fileToTransfer + " " + cluster.getMasterNode().getClusterIp() + " " + destDir
-                    + " " + "unpack";
+            pargs = username + " " + fileToTransfer + " " + cluster.getMasterNode().getClusterIp() + " " + destDir + " "
+                    + "unpack";
             event = new Event("file_transfer", nodeid, pargs);
             p = new Pattern(null, 1, null, event);
             patternList.add(p);
@@ -529,8 +525,8 @@ public class PatternCreator {
             String[] nodeIODevices;
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
             nodeIODevices = iodevices.trim().split(",");
+            String nodeStore = cluster.getStore().trim();
             for (String nodeIODevice : nodeIODevices) {
-                String nodeStore = node.getStore() == null ? cluster.getStore() : node.getStore();
                 pargs = nodeIODevice.trim() + File.separator + nodeStore;
                 Event event = new Event("file_delete", nodeid, pargs);
                 patternList.add(new Pattern(null, 1, null, event));
@@ -540,13 +536,15 @@ public class PatternCreator {
         return patterns;
     }
 
-    private Pattern createCopyHyracksPattern(String instanceName, Cluster cluster, String destinationIp, String destDir) {
+    private Pattern createCopyHyracksPattern(String instanceName, Cluster cluster, String destinationIp,
+            String destDir) {
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
         String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
-        String asterixZipName = AsterixEventService.getAsterixZip().substring(
-                AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
-        String fileToTransfer = new File(AsterixEventService.getAsterixDir() + File.separator + instanceName
-                + File.separator + asterixZipName).getAbsolutePath();
+        String asterixZipName = AsterixEventService.getAsterixZip()
+                .substring(AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
+        String fileToTransfer = new File(
+                AsterixEventService.getAsterixDir() + File.separator + instanceName + File.separator + asterixZipName)
+                        .getAbsolutePath();
         String pargs = username + " " + fileToTransfer + " " + destinationIp + " " + destDir + " " + "unpack";
         Event event = new Event("file_transfer", nodeid, pargs);
         return new Pattern(null, 1, null, event);
@@ -607,8 +605,8 @@ public class PatternCreator {
             ps.add(p);
 
             nodeid = new Nodeid(new Value(null, nodeToBeAdded.getId()));
-            pargs = cluster.getUsername() + " " + hadoopDir.getAbsolutePath() + " " + nodeToBeAdded.getClusterIp()
-                    + " " + workingDir;
+            pargs = cluster.getUsername() + " " + hadoopDir.getAbsolutePath() + " " + nodeToBeAdded.getClusterIp() + " "
+                    + workingDir;
             event = new Event("directory_transfer", nodeid, pargs);
             p = new Pattern(null, 1, null, event);
             addInitialDelay(p, 2, "sec");
@@ -626,8 +624,8 @@ public class PatternCreator {
         String username = cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername();
         String srcHost = cluster.getMasterNode().getClientIp();
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
-        String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode()
-                .getLogDir();
+        String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir()
+                : cluster.getMasterNode().getLogDir();
         String destDir = outputDir + File.separator + "cc";
         String pargs = username + " " + srcHost + " " + srcDir + " " + destDir;
         Event event = new Event("directory_copy", nodeid, pargs);
@@ -649,7 +647,7 @@ public class PatternCreator {
         Patterns patterns = new Patterns(patternList);
         return patterns;
     }
-    
+
     private Patterns createRemoveAsterixReplicationPattern(AsterixInstance instance) throws Exception {
 
         List<Pattern> patternList = new ArrayList<Pattern>();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
index ebf41cc..c4a96f4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -34,7 +34,6 @@ import org.apache.asterix.external.indexing.dataflow.HDFSObjectTupleParserFactor
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import org.apache.hadoop.fs.BlockLocation;
@@ -211,12 +210,10 @@ public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IAd
         Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         for (String i : stores.keySet()) {
             String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
             for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                    locs.add(i);
-                }
+                //two readers per partition
+                locs.add(i);
+                locs.add(i);
             }
         }
         String[] cluster = new String[locs.size()];
@@ -273,67 +270,67 @@ public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IAd
      * @throws IOException
      */
     protected InputSplit[] getSplits(JobConf conf) throws IOException {
-        // Create file system object
-        FileSystem fs = FileSystem.get(conf);
         ArrayList<FileSplit> fileSplits = new ArrayList<FileSplit>();
         ArrayList<ExternalFile> orderedExternalFiles = new ArrayList<ExternalFile>();
-        // Create files splits
-        for (ExternalFile file : files) {
-            Path filePath = new Path(file.getFileName());
-            FileStatus fileStatus;
-            try {
-                fileStatus = fs.getFileStatus(filePath);
-            } catch (FileNotFoundException e) {
-                // file was deleted at some point, skip to next file
-                continue;
-            }
-            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
-                    && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
-                // Get its information from HDFS name node
-                BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
-                // Create a split per block
-                for (BlockLocation block : fileBlocks) {
-                    if (block.getOffset() < file.getSize()) {
-                        fileSplits
-                                .add(new FileSplit(filePath,
-                                        block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
-                                                ? block.getLength() : (file.getSize() - block.getOffset()),
-                                block.getHosts()));
-                        orderedExternalFiles.add(file);
-                    }
-                }
-            } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
-                    && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
-                long oldSize = 0L;
-                long newSize = file.getSize();
-                for (int i = 0; i < files.size(); i++) {
-                    if (files.get(i).getFileName() == file.getFileName() && files.get(i).getSize() != file.getSize()) {
-                        newSize = files.get(i).getSize();
-                        oldSize = file.getSize();
-                        break;
-                    }
+        // Create file system object
+        try (FileSystem fs = FileSystem.get(conf)) {
+            // Create files splits
+            for (ExternalFile file : files) {
+                Path filePath = new Path(file.getFileName());
+                FileStatus fileStatus;
+                try {
+                    fileStatus = fs.getFileStatus(filePath);
+                } catch (FileNotFoundException e) {
+                    // file was deleted at some point, skip to next file
+                    continue;
                 }
-
-                // Get its information from HDFS name node
-                BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
-                // Create a split per block
-                for (BlockLocation block : fileBlocks) {
-                    if (block.getOffset() + block.getLength() > oldSize) {
-                        if (block.getOffset() < newSize) {
-                            // Block interact with delta -> Create a split
-                            long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
-                            long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
-                                    : block.getOffset() + block.getLength() - newSize;
-                            long splitLength = block.getLength() - startCut - endCut;
-                            fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+                if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
+                        && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+                    // Get its information from HDFS name node
+                    BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
+                    // Create a split per block
+                    for (BlockLocation block : fileBlocks) {
+                        if (block.getOffset() < file.getSize()) {
+                            fileSplits.add(new FileSplit(filePath,
+                                    block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
+                                            ? block.getLength() : (file.getSize() - block.getOffset()),
                                     block.getHosts()));
                             orderedExternalFiles.add(file);
                         }
                     }
+                } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
+                        && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+                    long oldSize = 0L;
+                    long newSize = file.getSize();
+                    for (int i = 0; i < files.size(); i++) {
+                        if (files.get(i).getFileName() == file.getFileName()
+                                && files.get(i).getSize() != file.getSize()) {
+                            newSize = files.get(i).getSize();
+                            oldSize = file.getSize();
+                            break;
+                        }
+                    }
+
+                    // Get its information from HDFS name node
+                    BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
+                    // Create a split per block
+                    for (BlockLocation block : fileBlocks) {
+                        if (block.getOffset() + block.getLength() > oldSize) {
+                            if (block.getOffset() < newSize) {
+                                // Block interact with delta -> Create a split
+                                long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
+                                long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
+                                        : block.getOffset() + block.getLength() - newSize;
+                                long splitLength = block.getLength() - startCut - endCut;
+                                fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+                                        block.getHosts()));
+                                orderedExternalFiles.add(file);
+                            }
+                        }
+                    }
                 }
             }
         }
-        fs.close();
         files = orderedExternalFiles;
         return fileSplits.toArray(new FileSplit[fileSplits.size()]);
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
index 11e2b96..8bf6d93 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
@@ -32,7 +32,6 @@ import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
@@ -186,11 +185,8 @@ public class HDFSIndexingAdapterFactory extends HDFSAdapterFactory {
         Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         for (String i : stores.keySet()) {
             String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
             for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                }
+                locs.add(i);
             }
         }
         String[] cluster = new String[locs.size()];

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
index bce4620..6ff991b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.indexing.operators;
 
-import java.io.File;
 import java.util.List;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -49,9 +48,7 @@ public class ExternalDatasetIndexesAbortOperatorDescriptor extends AbstractExter
     @Override
     protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
             IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        FileReference file = new FileReference(new File(IndexFileNameUtil.prepareFileName(fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getIODeviceId())));
+        FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
         AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
         fileManager.deleteTransactionFiles();
     }


[2/3] incubator-asterixdb git commit: Divide Cluster into Unique Partitions

Posted by mh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
index 0eacc15..e89a8db 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.indexing.operators;
 import java.util.List;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
@@ -49,18 +50,15 @@ public class ExternalDatasetIndexesCommitOperatorDescriptor extends AbstractExte
     @Override
     protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
             IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        System.err.println("performing the operation on "+ IndexFileNameUtil.prepareFileName(fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getIODeviceId()));
+        FileReference resourecePath = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
+        System.err.println("performing the operation on "+ resourecePath.getFile().getAbsolutePath());
         // Get DataflowHelper
         IIndexDataflowHelper indexHelper = indexDataflowHelperFactory.createIndexDataflowHelper(fileIndexInfo, ctx, partition);
         // Get index
         IIndex index = indexHelper.getIndexInstance();
         // commit transaction
         ((ITwoPCIndex) index).commitTransaction();
-        System.err.println("operation on "+ IndexFileNameUtil.prepareFileName(fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getIODeviceId()) + " Succeded");
+        System.err.println("operation on "+ resourecePath.getFile().getAbsolutePath() + " Succeded");
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
index 8e7a288..9bdfaa6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
@@ -49,9 +49,7 @@ public class ExternalDatasetIndexesRecoverOperatorDescriptor extends AbstractExt
     @Override
     protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
             IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        FileReference file = new FileReference(new File(IndexFileNameUtil.prepareFileName(fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getIODeviceId())));
+        FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
         AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
         fileManager.recoverTransaction();
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 2254f6f..09c65c8 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -27,8 +27,6 @@ import java.util.Set;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Unmarshaller;
 
-import org.kohsuke.args4j.Option;
-
 import org.apache.asterix.event.management.EventUtil;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.MasterNode;
@@ -37,6 +35,7 @@ import org.apache.asterix.event.service.AsterixEventServiceUtil;
 import org.apache.asterix.installer.driver.InstallerDriver;
 import org.apache.asterix.installer.schema.conf.Configuration;
 import org.apache.asterix.installer.schema.conf.Zookeeper;
+import org.kohsuke.args4j.Option;
 
 public class ValidateCommand extends AbstractCommand {
 
@@ -97,7 +96,7 @@ public class ValidateCommand extends AbstractCommand {
             valid = false;
         } else {
             cluster = EventUtil.getCluster(clusterPath);
-            validateClusterProperties(cluster);
+            valid = valid & validateClusterProperties(cluster);
 
             Set<String> servers = new HashSet<String>();
             Set<String> serverIds = new HashSet<String>();
@@ -106,7 +105,7 @@ public class ValidateCommand extends AbstractCommand {
 
             MasterNode masterNode = cluster.getMasterNode();
             Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
-                    masterNode.getLogDir(), null, null, null, null);
+                    masterNode.getLogDir(), null, null, null);
             ipAddresses.add(masterNode.getClusterIp());
 
             valid = valid & validateNodeConfiguration(master, cluster);
@@ -158,7 +157,7 @@ public class ValidateCommand extends AbstractCommand {
         return true;
     }
 
-    private void validateClusterProperties(Cluster cluster) {
+    private boolean validateClusterProperties(Cluster cluster) {
         List<String> tempDirs = new ArrayList<String>();
         if (cluster.getLogDir() != null && checkTemporaryPath(cluster.getLogDir())) {
             tempDirs.add("Log directory: " + cluster.getLogDir());
@@ -176,6 +175,11 @@ public class ValidateCommand extends AbstractCommand {
             LOGGER.warn(msg);
         }
 
+        if (cluster.getStore() == null || cluster.getStore().length() == 0) {
+            LOGGER.fatal("store not defined at cluster" + ERROR);
+            return false;
+        }
+        return true;
     }
 
     private boolean validateNodeConfiguration(Node node, Cluster cluster) {
@@ -201,14 +205,6 @@ public class ValidateCommand extends AbstractCommand {
             }
         }
 
-        if (node.getStore() == null || node.getStore().length() == 0) {
-            if (!cluster.getMasterNode().getId().equals(node.getId())
-                    && (cluster.getStore() == null || cluster.getStore().length() == 0)) {
-                valid = false;
-                LOGGER.fatal("store not defined at cluster/node level for node: " + node.getId() + ERROR);
-            }
-        }
-
         if (node.getIodevices() == null || node.getIodevices().length() == 0) {
             if (!cluster.getMasterNode().getId().equals(node.getId())
                     && (cluster.getIodevices() == null || cluster.getIodevices().length() == 0)) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java b/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
index 56da6ee..1ac60ba 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
@@ -48,14 +48,11 @@ public class InstallerUtil {
         String storeDataSubDir = asterixInstanceName + File.separator + "data" + File.separator;
         String[] storeDirs = null;
         StringBuffer nodeDataStore = new StringBuffer();
-        String storeDirValue = node.getStore();
+        String storeDirValue = cluster.getStore();
         if (storeDirValue == null) {
-            storeDirValue = cluster.getStore();
-            if (storeDirValue == null) {
-                throw new IllegalStateException(" Store not defined for node " + node.getId());
-            }
-            storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
+            throw new IllegalStateException(" Store not defined for node " + node.getId());
         }
+        storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
 
         storeDirs = storeDirValue.split(",");
         for (String ns : storeDirs) {
@@ -66,8 +63,8 @@ public class InstallerUtil {
         return nodeDataStore.toString();
     }
 
-    public static AsterixConfiguration getAsterixConfiguration(String asterixConf) throws FileNotFoundException,
-            IOException, JAXBException {
+    public static AsterixConfiguration getAsterixConfiguration(String asterixConf)
+            throws FileNotFoundException, IOException, JAXBException {
         if (asterixConf == null) {
             asterixConf = InstallerDriver.getManagixHome() + File.separator + DEFAULT_ASTERIX_CONFIGURATION_PATH;
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index f22b2f1..5317fc2 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -32,6 +32,7 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -62,8 +63,10 @@ import org.apache.asterix.metadata.entities.Node;
 import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.AdapterIdentifier;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
@@ -108,7 +111,6 @@ public class MetadataBootstrap {
     private static IIOManager ioManager;
 
     private static String metadataNodeName;
-    private static String metadataStore;
     private static Set<String> nodeNames;
     private static String outputDir;
 
@@ -147,9 +149,7 @@ public class MetadataBootstrap {
 
         AsterixMetadataProperties metadataProperties = propertiesProvider.getMetadataProperties();
         metadataNodeName = metadataProperties.getMetadataNodeName();
-        metadataStore = metadataProperties.getMetadataStore();
         nodeNames = metadataProperties.getNodeNames();
-        // nodeStores = asterixProperity.getStores();
 
         dataLifecycleManager = runtimeContext.getDatasetLifecycleManager();
         localResourceRepository = runtimeContext.getLocalResourceRepository();
@@ -375,11 +375,14 @@ public class MetadataBootstrap {
 
     private static void enlistMetadataDataset(IMetadataIndex index, boolean create, MetadataTransactionContext mdTxnCtx)
             throws Exception {
-        String filePath = ioManager.getIODevices().get(runtimeContext.getMetaDataIODeviceId()).getPath()
-                + File.separator
-                + IndexFileNameUtil.prepareFileName(metadataStore + File.separator + index.getFileNameRelativePath(),
-                        runtimeContext.getMetaDataIODeviceId());
-        FileReference file = new FileReference(new File(filePath));
+        ClusterPartition metadataPartition = propertiesProvider.getMetadataProperties().getMetadataPartition();
+        int metadataDeviceId = metadataPartition.getIODeviceNum();
+        String metadataPartitionPath = SplitsAndConstraintsUtil.prepareStoragePartitionPath(
+                AsterixClusterProperties.INSTANCE.getStorageDirectoryName(),
+                metadataPartition.getPartitionId());
+        String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
+        FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
+
         List<IVirtualBufferCache> virtualBufferCaches = runtimeContext
                 .getVirtualBufferCaches(index.getDatasetId().getId());
         ITypeTraits[] typeTraits = index.getTypeTraits();
@@ -391,7 +394,7 @@ public class MetadataBootstrap {
                 ? runtimeContext.getLSMBTreeOperationTracker(index.getDatasetId().getId())
                 : new BaseOperationTracker(index.getDatasetId().getId(),
                         dataLifecycleManager.getDatasetInfo(index.getDatasetId().getId()));
-        final String path = file.getFile().getPath();
+        final String absolutePath = file.getFile().getPath();
         if (create) {
             lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
                     comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
@@ -409,12 +412,13 @@ public class MetadataBootstrap {
             ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
                     localResourceMetadata, LocalResource.LSMBTreeResource);
             ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
-            localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0));
-            dataLifecycleManager.register(path, lsmBtree);
+            localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, resourceName,
+                    metadataPartition.getPartitionId(), absolutePath));
+            dataLifecycleManager.register(absolutePath, lsmBtree);
         } else {
-            final LocalResource resource = localResourceRepository.getResourceByName(path);
+            final LocalResource resource = localResourceRepository.getResourceByPath(absolutePath);
             resourceID = resource.getResourceId();
-            lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(resource.getResourceName());
+            lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(absolutePath);
             if (lsmBtree == null) {
                 lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
                         typeTraits, comparatorFactories, bloomFilterKeyFields,
@@ -424,7 +428,7 @@ public class MetadataBootstrap {
                         opTracker, runtimeContext.getLSMIOScheduler(),
                         LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
                         null, null, null, null, true);
-                dataLifecycleManager.register(path, lsmBtree);
+                dataLifecycleManager.register(absolutePath, lsmBtree);
             }
         }
 
@@ -529,4 +533,4 @@ public class MetadataBootstrap {
             MetadataManager.INSTANCE.releaseWriteLatch();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
deleted file mode 100644
index 92ab90d..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.metadata.declared;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.annotations.TypeDataGen;
-import org.apache.asterix.common.config.AsterixMetadataProperties;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.formats.base.IDataFormat;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.api.IMetadataManager;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Datatype;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.data.IAWriterFactory;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-
-public class AqlCompiledMetadataDeclarations {
-    private static Logger LOGGER = Logger.getLogger(AqlCompiledMetadataDeclarations.class.getName());
-
-    // We are assuming that there is a one AqlCompiledMetadataDeclarations per
-    // transaction.
-    private final MetadataTransactionContext mdTxnCtx;
-    private String dataverseName = null;
-    private FileSplit outputFile;
-    private Map<String, String[]> stores;
-    private IDataFormat format;
-    private Map<String, String> config;
-
-    private final Map<String, IAType> types;
-    private final Map<String, TypeDataGen> typeDataGenMap;
-    private final IAWriterFactory writerFactory;
-
-    private IMetadataManager metadataManager = MetadataManager.INSTANCE;
-    private boolean isConnected = false;
-
-    public AqlCompiledMetadataDeclarations(MetadataTransactionContext mdTxnCtx, String dataverseName,
-            FileSplit outputFile, Map<String, String> config, Map<String, String[]> stores, Map<String, IAType> types,
-            Map<String, TypeDataGen> typeDataGenMap, IAWriterFactory writerFactory, boolean online) {
-        this.mdTxnCtx = mdTxnCtx;
-        this.dataverseName = dataverseName;
-        this.outputFile = outputFile;
-        this.config = config;
-        AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
-        if (stores == null && online) {
-            this.stores = metadataProperties.getStores();
-        } else {
-            this.stores = stores;
-        }
-        this.types = types;
-        this.typeDataGenMap = typeDataGenMap;
-        this.writerFactory = writerFactory;
-    }
-
-    public void connectToDataverse(String dvName) throws AlgebricksException, AsterixException {
-        if (isConnected) {
-            throw new AlgebricksException("You are already connected to " + dataverseName + " dataverse");
-        }
-        Dataverse dv;
-        try {
-            dv = metadataManager.getDataverse(mdTxnCtx, dvName);
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-        if (dv == null) {
-            throw new AlgebricksException("There is no dataverse with this name " + dvName + " to connect to.");
-        }
-        dataverseName = dvName;
-        isConnected = true;
-        try {
-            format = (IDataFormat) Class.forName(dv.getDataFormat()).newInstance();
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-    }
-
-    public void disconnectFromDataverse() throws AlgebricksException {
-        if (!isConnected) {
-            throw new AlgebricksException("You are not connected to any dataverse");
-        }
-        dataverseName = null;
-        format = null;
-        isConnected = false;
-    }
-
-    public boolean isConnectedToDataverse() {
-        return isConnected;
-    }
-
-    public String getDataverseName() {
-        return dataverseName;
-    }
-
-    public FileSplit getOutputFile() {
-        return outputFile;
-    }
-
-    public IDataFormat getFormat() throws AlgebricksException {
-        if (!isConnected) {
-            throw new AlgebricksException("You need first to connect to a dataverse.");
-        }
-        return format;
-    }
-
-    public String getPropertyValue(String propertyName) {
-        return config.get(propertyName);
-    }
-
-    public IAType findType(String typeName) {
-        Datatype type;
-        try {
-            type = metadataManager.getDatatype(mdTxnCtx, dataverseName, typeName);
-        } catch (Exception e) {
-            throw new IllegalStateException();
-        }
-        if (type == null) {
-            throw new IllegalStateException();
-        }
-        return type.getDatatype();
-    }
-
-    public List<String> findNodeGroupNodeNames(String nodeGroupName) throws AlgebricksException {
-        NodeGroup ng;
-        try {
-            ng = metadataManager.getNodegroup(mdTxnCtx, nodeGroupName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-        if (ng == null) {
-            throw new AlgebricksException("No node group with this name " + nodeGroupName);
-        }
-        return ng.getNodeNames();
-    }
-
-    public Map<String, String[]> getAllStores() {
-        return stores;
-    }
-
-    public Dataset findDataset(String datasetName) throws AlgebricksException {
-        try {
-            return metadataManager.getDataset(mdTxnCtx, dataverseName, datasetName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
-        try {
-            return metadataManager.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public Index getDatasetPrimaryIndex(String dataverseName, String datasetName) throws AlgebricksException {
-        try {
-            return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public Index getIndex(String dataverseName, String datasetName, String indexName) throws AlgebricksException {
-        try {
-            return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public void setOutputFile(FileSplit outputFile) {
-        this.outputFile = outputFile;
-    }
-
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-            String datasetName, String targetIdxName) throws AlgebricksException {
-        FileSplit[] splits = splitsForInternalOrFeedDataset(datasetName, targetIdxName);
-        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
-        String[] loc = new String[splits.length];
-        for (int p = 0; p < splits.length; p++) {
-            loc[p] = splits[p].getNodeName();
-        }
-        AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
-        return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
-    }
-
-    private FileSplit[] splitsForInternalOrFeedDataset(String datasetName, String targetIdxName)
-            throws AlgebricksException {
-
-        File relPathFile = new File(getRelativePath(datasetName + "_idx_" + targetIdxName));
-        Dataset dataset = findDataset(datasetName);
-        if (dataset.getDatasetType() != DatasetType.INTERNAL) {
-            throw new AlgebricksException("Not an internal dataset");
-        }
-        List<String> nodeGroup = findNodeGroupNodeNames(dataset.getNodeGroupName());
-        if (nodeGroup == null) {
-            throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
-        }
-
-        List<FileSplit> splitArray = new ArrayList<FileSplit>();
-        for (String nd : nodeGroup) {
-            String[] nodeStores = stores.get(nd);
-            if (nodeStores == null) {
-                LOGGER.warning("Node " + nd + " has no stores.");
-                throw new AlgebricksException("Node " + nd + " has no stores.");
-            } else {
-                for (int j = 0; j < nodeStores.length; j++) {
-                    File f = new File(nodeStores[j] + File.separator + relPathFile);
-                    splitArray.add(new FileSplit(nd, new FileReference(f)));
-                }
-            }
-        }
-        FileSplit[] splits = new FileSplit[splitArray.size()];
-        int i = 0;
-        for (FileSplit fs : splitArray) {
-            splits[i++] = fs;
-        }
-        return splits;
-    }
-
-    public String getRelativePath(String fileName) {
-        return dataverseName + File.separator + fileName;
-    }
-
-    public Map<String, TypeDataGen> getTypeDataGenMap() {
-        return typeDataGenMap;
-    }
-
-    public Map<String, IAType> getTypeDeclarations() {
-        return types;
-    }
-
-    public IAWriterFactory getWriterFactory() {
-        return writerFactory;
-    }
-
-    public MetadataTransactionContext getMetadataTransactionContext() {
-        return mdTxnCtx;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
index ad06737..e960336 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
@@ -19,10 +19,6 @@
 
 package org.apache.asterix.metadata.declared;
 
-import java.util.ArrayList;
-import java.util.Map;
-
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
@@ -49,16 +45,6 @@ public class AqlLogicalPlanAndMetadataImpl implements ILogicalPlanAndMetadata {
 
     @Override
     public AlgebricksPartitionConstraint getClusterLocations() {
-        Map<String, String[]> stores = metadataProvider.getAllStores();
-        ArrayList<String> locs = new ArrayList<String>();
-        for (String k : stores.keySet()) {
-            String[] nodeStores = stores.get(k);
-            for (int j = 0; j < nodeStores.length; j++) {
-                locs.add(k);
-            }
-        }
-        String[] cluster = new String[locs.size()];
-        cluster = locs.toArray(cluster);
-        return new AlgebricksAbsolutePartitionConstraint(cluster);
+        return metadataProvider.getClusterLocations();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index d61d323..745f436 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -91,6 +91,7 @@ import org.apache.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.metadata.feeds.FeedUtil;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -148,7 +149,6 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.ShortPointable;
@@ -160,16 +160,13 @@ import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import org.apache.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
 import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
 import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
@@ -2095,10 +2092,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return jobId;
     }
 
-    public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
-        return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
-    }
-
     public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
             throws AlgebricksException {
         return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
@@ -2128,7 +2121,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
                 .getNodeNames();
         for (String nd : nodeGroup) {
-            numPartitions += AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
+            numPartitions += AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
         }
         return numElementsHint /= numPartitions;
     }
@@ -2141,88 +2134,17 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
             String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
         FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
-        return splitProviderAndPartitionConstraints(splits);
+        return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraints(splits);
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
             String dataverse) {
-        FileSplit[] splits = splitsForDataverse(mdTxnCtx, dataverse);
-        return splitProviderAndPartitionConstraints(splits);
-    }
-
-    private Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
-            FileSplit[] splits) {
-        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
-        String[] loc = new String[splits.length];
-        for (int p = 0; p < splits.length; p++) {
-            loc[p] = splits[p].getNodeName();
-        }
-        AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
-        return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
-    }
-
-    private FileSplit[] splitsForDataverse(MetadataTransactionContext mdTxnCtx, String dataverseName) {
-        File relPathFile = new File(dataverseName);
-        List<FileSplit> splits = new ArrayList<FileSplit>();
-        for (Map.Entry<String, String[]> entry : stores.entrySet()) {
-            String node = entry.getKey();
-            String[] nodeStores = entry.getValue();
-            if (nodeStores == null) {
-                continue;
-            }
-            for (int i = 0; i < nodeStores.length; i++) {
-                int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
-                String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(node);
-                for (int j = 0; j < nodeStores.length; j++) {
-                    for (int k = 0; k < numIODevices; k++) {
-                        File f = new File(ioDevices[k] + File.separator + nodeStores[j] + File.separator + relPathFile);
-                        splits.add(new FileSplit(node, new FileReference(f), k));
-                    }
-                }
-            }
-        }
-        return splits.toArray(new FileSplit[] {});
+        return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse);
     }
 
     public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
             String targetIdxName, boolean temp) throws AlgebricksException {
-        try {
-            File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
-                    .getNodeNames();
-            if (nodeGroup == null) {
-                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
-            }
-
-            List<FileSplit> splitArray = new ArrayList<FileSplit>();
-            for (String nd : nodeGroup) {
-                String[] nodeStores = stores.get(nd);
-                if (nodeStores == null) {
-                    LOGGER.warning("Node " + nd + " has no stores.");
-                    throw new AlgebricksException("Node " + nd + " has no stores.");
-                } else {
-                    int numIODevices;
-                    if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
-                        numIODevices = 1;
-                    } else {
-                        numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
-                    }
-                    String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
-                    for (int j = 0; j < nodeStores.length; j++) {
-                        for (int k = 0; k < numIODevices; k++) {
-                            File f = new File(ioDevices[k] + File.separator + nodeStores[j]
-                                    + (temp ? (File.separator + TEMP_DATASETS_STORAGE_FOLDER) : "") + File.separator
-                                    + relPathFile);
-                            splitArray.add(new FileSplit(nd, new FileReference(f), k));
-                        }
-                    }
-                }
-            }
-            return splitArray.toArray(new FileSplit[0]);
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
+        return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
     }
 
     private static Map<String, String> initializeAdapterFactoryMapping() {
@@ -2256,10 +2178,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return adapter;
     }
 
-    private static String getRelativePath(String dataverseName, String fileName) {
-        return dataverseName + File.separator + fileName;
-    }
-
     public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
         try {
             return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, dataset);
@@ -2307,19 +2225,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     }
 
     public AlgebricksPartitionConstraint getClusterLocations() {
-        ArrayList<String> locs = new ArrayList<String>();
-        for (String i : stores.keySet()) {
-            String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
-            for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                }
-            }
-        }
-        String[] cluster = new String[locs.size()];
-        cluster = locs.toArray(cluster);
-        return new AlgebricksAbsolutePartitionConstraint(cluster);
+        return AsterixClusterProperties.INSTANCE.getClusterLocations();
     }
 
     public IDataFormat getFormat() {
@@ -2334,7 +2240,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
      * @return a new map containing the original dataset properties and the
      *         scheduler/locations
      */
-    private Map<String, Object> wrapProperties(Map<String, String> properties) {
+    private static Map<String, Object> wrapProperties(Map<String, String> properties) {
         Map<String, Object> wrappedProperties = new HashMap<String, Object>();
         wrappedProperties.putAll(properties);
         // wrappedProperties.put(SCHEDULER, hdfsScheduler);
@@ -2349,7 +2255,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
      *            the original properties
      * @return the new stirng-object map
      */
-    private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+    private static Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
         Map<String, Object> wrappedProperties = new HashMap<String, Object>();
         wrappedProperties.putAll(properties);
         return wrappedProperties;
@@ -2357,58 +2263,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
             String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
-        FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
-        return splitProviderAndPartitionConstraints(splits);
-    }
-
-    private FileSplit[] splitsForFilesIndex(MetadataTransactionContext mdTxnCtx, String dataverseName,
-            String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
-
-        try {
-            File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
-                    .getNodeNames();
-            if (nodeGroup == null) {
-                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
-            }
-
-            List<FileSplit> splitArray = new ArrayList<FileSplit>();
-            for (String nd : nodeGroup) {
-                String[] nodeStores = stores.get(nd);
-                if (nodeStores == null) {
-                    LOGGER.warning("Node " + nd + " has no stores.");
-                    throw new AlgebricksException("Node " + nd + " has no stores.");
-                } else {
-                    // Only the first partition when create
-                    String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
-                    if (create) {
-                        for (int j = 0; j < nodeStores.length; j++) {
-                            File f = new File(
-                                    ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
-                            splitArray.add(new FileSplit(nd, new FileReference(f), 0));
-                        }
-                    } else {
-                        int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
-                        for (int j = 0; j < nodeStores.length; j++) {
-                            for (int k = 0; k < numIODevices; k++) {
-                                File f = new File(
-                                        ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
-                                splitArray.add(new FileSplit(nd, new FileReference(f), 0));
-                            }
-                        }
-                    }
-                }
-            }
-            FileSplit[] splits = new FileSplit[splitArray.size()];
-            int i = 0;
-            for (FileSplit fs : splitArray) {
-                splits[i++] = fs;
-            }
-            return splits;
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
+        return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName,
+                datasetName, targetIdxName, create);
     }
 
     public AsterixStorageProperties getStorageProperties() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
new file mode 100644
index 0000000..5ef58cd
--- /dev/null
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataConstants;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class SplitsAndConstraintsUtil {
+
+    public static final String PARTITION_DIR_PREFIX = "partition_";
+    public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
+    public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
+
+    private static FileSplit[] splitsForDataverse(String dataverseName) {
+        File relPathFile = new File(dataverseName);
+        List<FileSplit> splits = new ArrayList<FileSplit>();
+        //get all partitions
+        ClusterPartition[] clusterPartition = AsterixClusterProperties.INSTANCE.getClusterPartitons();
+        String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+        for (int j = 0; j < clusterPartition.length; j++) {
+            int nodeParitions = AsterixClusterProperties.INSTANCE
+                    .getNodePartitionsCount(clusterPartition[j].getNodeId());
+            for (int i = 0; i < nodeParitions; i++) {
+                File f = new File(prepareStoragePartitionPath(storageDirName, clusterPartition[i].getPartitionId())
+                        + File.separator + relPathFile);
+                splits.add(getFileSplitForClusterPartition(clusterPartition[j], f));
+            }
+        }
+        return splits.toArray(new FileSplit[] {});
+    }
+
+    public static FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
+            String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
+        try {
+            File relPathFile = new File(prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
+            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+                    .getNodeNames();
+            if (nodeGroup == null) {
+                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
+            }
+
+            String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+            List<FileSplit> splits = new ArrayList<FileSplit>();
+            for (String nd : nodeGroup) {
+                int numPartitions = AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
+                ClusterPartition[] nodePartitions = AsterixClusterProperties.INSTANCE.getNodePartitions(nd);
+                //currently this case is never executed since the metadata group doesn't exists
+                if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
+                    numPartitions = 1;
+                }
+
+                for (int k = 0; k < numPartitions; k++) {
+                    //format: 'storage dir name'/partition_#/dataverse/dataset_idx_index
+                    File f = new File(prepareStoragePartitionPath(storageDirName, nodePartitions[k].getPartitionId())
+                            + (temp ? (File.separator + TEMP_DATASETS_STORAGE_FOLDER) : "") + File.separator
+                            + relPathFile);
+                    splits.add(getFileSplitForClusterPartition(nodePartitions[k], f));
+                }
+            }
+            return splits.toArray(new FileSplit[] {});
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    private static FileSplit[] splitsForFilesIndex(MetadataTransactionContext mdTxnCtx, String dataverseName,
+            String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
+        try {
+            File relPathFile = new File(prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
+            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+                    .getNodeNames();
+            if (nodeGroup == null) {
+                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
+            }
+
+            List<FileSplit> splits = new ArrayList<FileSplit>();
+            for (String nodeId : nodeGroup) {
+                //get node partitions
+                ClusterPartition[] nodePartitions = AsterixClusterProperties.INSTANCE.getNodePartitions(nodeId);
+                String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+                int firstPartition = 0;
+                if (create) {
+                    // Only the first partition when create
+                    File f = new File(
+                            prepareStoragePartitionPath(storageDirName, nodePartitions[firstPartition].getPartitionId())
+                                    + File.separator + relPathFile);
+                    splits.add(getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+                } else {
+                    for (int k = 0; k < nodePartitions.length; k++) {
+                        File f = new File(prepareStoragePartitionPath(storageDirName,
+                                nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile);
+                        splits.add(getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+                    }
+                }
+            }
+            return splits.toArray(new FileSplit[] {});
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
+            String dataverse) {
+        FileSplit[] splits = splitsForDataverse(dataverse);
+        return splitProviderAndPartitionConstraints(splits);
+    }
+
+    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
+            MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String targetIdxName,
+            boolean create) throws AlgebricksException {
+        FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
+        return splitProviderAndPartitionConstraints(splits);
+    }
+
+    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
+            FileSplit[] splits) {
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+        String[] loc = new String[splits.length];
+        for (int p = 0; p < splits.length; p++) {
+            loc[p] = splits[p].getNodeName();
+        }
+        AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
+        return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
+    }
+
+    private static FileSplit getFileSplitForClusterPartition(ClusterPartition partition, File relativeFile) {
+        return new FileSplit(partition.getActiveNodeId(), new FileReference(relativeFile), partition.getIODeviceNum(),
+                partition.getPartitionId());
+    }
+
+    public static String prepareStoragePartitionPath(String storageDirName, int partitonId) {
+        return storageDirName + File.separator + PARTITION_DIR_PREFIX + partitonId;
+    }
+
+    private static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
+        return dataverseName + File.separator + datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index f2482da..95eea63 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -33,6 +34,7 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -50,7 +52,7 @@ public class AsterixClusterProperties {
     public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
 
     private static final String IO_DEVICES = "iodevices";
-
+    private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
     private Map<String, Map<String, String>> ncConfiguration = new HashMap<String, Map<String, String>>();
 
     private final Cluster cluster;
@@ -59,6 +61,9 @@ public class AsterixClusterProperties {
 
     private boolean globalRecoveryCompleted = false;
 
+    private Map<String, ClusterPartition[]> node2PartitionsMap = null;
+    private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
+
     private AsterixClusterProperties() {
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
         if (is != null) {
@@ -66,37 +71,73 @@ public class AsterixClusterProperties {
                 JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
                 Unmarshaller unmarshaller = ctx.createUnmarshaller();
                 cluster = (Cluster) unmarshaller.unmarshal(is);
-
             } catch (JAXBException e) {
                 throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
             }
         } else {
             cluster = null;
         }
+        //if this is the CC process
+        if (AsterixAppContextInfo.getInstance() != null) {
+            if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) {
+                node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
+                clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
+            }
+        }
     }
 
     private ClusterState state = ClusterState.UNUSABLE;
 
     public synchronized void removeNCConfiguration(String nodeId) {
+        updateNodePartitions(nodeId, false);
         ncConfiguration.remove(nodeId);
-        if (ncConfiguration.keySet().size() != AsterixAppContextInfo.getInstance().getMetadataProperties()
-                .getNodeNames().size()) {
-            state = ClusterState.UNUSABLE;
-            LOGGER.info("Cluster now is in UNSABLE state");
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(" Removing configuration parameters for node id " + nodeId);
         }
-        resetClusterPartitionConstraint();
+        //TODO implement fault tolerance as follows:
+        //1. collect the partitions of the failed NC
+        //2. For each partition, request a remote replica to take over. 
+        //3. wait until each remote replica completes the recovery for the lost partitions
+        //4. update the cluster state
     }
 
     public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
         ncConfiguration.put(nodeId, configuration);
-        if (ncConfiguration.keySet().size() == AsterixAppContextInfo.getInstance().getMetadataProperties()
-                .getNodeNames().size()) {
-            state = ClusterState.ACTIVE;
-        }
+        updateNodePartitions(nodeId, true);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" Registering configuration parameters for node id " + nodeId);
         }
-        resetClusterPartitionConstraint();
+    }
+
+    private synchronized void updateNodePartitions(String nodeId, boolean added) {
+        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
+        //if this isn't a storage node, it will not have cluster partitions
+        if (nodePartitions != null) {
+            for (ClusterPartition p : nodePartitions) {
+                //set the active node for this node's partitions
+                p.setActive(added);
+                if (added) {
+                    p.setActiveNodeId(nodeId);
+                } else {
+                    p.setActiveNodeId(null);
+                }
+            }
+            resetClusterPartitionConstraint();
+            updateClusterState();
+        }
+    }
+
+    private synchronized void updateClusterState() {
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (!p.isActive()) {
+                state = ClusterState.UNUSABLE;
+                LOGGER.info("Cluster is in UNSABLE state");
+                return;
+            }
+        }
+        //if all storage partitions are active, then the cluster is active
+        state = ClusterState.ACTIVE;
+        LOGGER.info("Cluster is now ACTIVE");
     }
 
     /**
@@ -162,20 +203,14 @@ public class AsterixClusterProperties {
     }
 
     private synchronized void resetClusterPartitionConstraint() {
-        Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
-        ArrayList<String> locs = new ArrayList<String>();
-        for (String i : stores.keySet()) {
-            String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
-            for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                }
+        ArrayList<String> clusterActiveLocations = new ArrayList<>();
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (p.isActive()) {
+                clusterActiveLocations.add(p.getActiveNodeId());
             }
         }
-        String[] cluster = new String[locs.size()];
-        cluster = locs.toArray(cluster);
-        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(cluster);
+        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                clusterActiveLocations.toArray(new String[] {}));
     }
 
     public boolean isGlobalRecoveryCompleted() {
@@ -194,7 +229,34 @@ public class AsterixClusterProperties {
         return AsterixClusterProperties.INSTANCE.getState() == ClusterState.ACTIVE;
     }
 
-    public static int getNumberOfNodes(){
+    public static int getNumberOfNodes() {
         return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size();
     }
+
+    public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
+        return node2PartitionsMap.get(nodeId);
+    }
+
+    public synchronized int getNodePartitionsCount(String node) {
+        if (node2PartitionsMap.containsKey(node)) {
+            return node2PartitionsMap.get(node).length;
+        }
+        return 0;
+    }
+
+    public synchronized ClusterPartition[] getClusterPartitons() {
+        ArrayList<ClusterPartition> partitons = new ArrayList<>();
+        for (ClusterPartition cluster : clusterPartitions.values()) {
+            partitons.add(cluster);
+        }
+        return partitons.toArray(new ClusterPartition[] {});
+    }
+
+    public String getStorageDirectoryName() {
+        if (cluster != null) {
+            return cluster.getStore();
+        }
+        //virtual cluster without cluster config file
+        return DEFAULT_STORAGE_DIR_NAME;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 58bc44e..794f867 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -50,13 +50,13 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 48ebff3..06a1957 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -47,12 +47,12 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 23eb2be..f2a6820 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -48,12 +48,12 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 6e0394a..8d838a3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -48,12 +48,12 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
index 0566367..15224e2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
@@ -33,7 +33,7 @@ public class PersistentLocalResourceFactory implements ILocalResourceFactory {
     }
 
     @Override
-    public LocalResource createLocalResource(long resourceId, String resourceName, int partition) {
-        return new LocalResource(resourceId, resourceName, partition, resourceType, localResourceMetadata);
+    public LocalResource createLocalResource(long resourceId, String resourceName, int partition, String resourcePath) {
+        return new LocalResource(resourceId, resourceName, partition, resourcePath, resourceType, localResourceMetadata);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 8ae3eb1..52fd806 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -64,8 +64,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
 
-    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId)
-            throws HyracksDataException {
+    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
         mountPoints = new String[devices.size()];
         this.nodeId = nodeId;
         for (int i = 0; i < mountPoints.length; i++) {
@@ -123,7 +122,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
             }
 
             LocalResource rootLocalResource = new LocalResource(STORAGE_LOCAL_RESOURCE_ID,
-                    storageMetadataFile.getAbsolutePath(), 0, 0, storageRootDirPath);
+                    storageMetadataFile.getAbsolutePath(), 0, storageMetadataFile.getAbsolutePath(), 0,
+                    storageRootDirPath);
             insert(rootLocalResource);
             LOGGER.log(Level.INFO, "created the root-metadata-file: " + storageMetadataFile.getAbsolutePath());
         }
@@ -131,13 +131,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     }
 
     @Override
-    public LocalResource getResourceByName(String name) throws HyracksDataException {
-        LocalResource resource = resourceCache.getIfPresent(name);
+    public LocalResource getResourceByPath(String path) throws HyracksDataException {
+        LocalResource resource = resourceCache.getIfPresent(path);
         if (resource == null) {
-            File resourceFile = getLocalResourceFileByName(name);
+            File resourceFile = getLocalResourceFileByName(path);
             if (resourceFile.exists()) {
                 resource = readLocalResource(resourceFile);
-                resourceCache.put(name, resource);
+                resourceCache.put(path, resource);
             }
         }
         return resource;
@@ -145,13 +145,15 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
 
     @Override
     public synchronized void insert(LocalResource resource) throws HyracksDataException {
-        File resourceFile = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
+        File resourceFile = new File(getFileName(resource.getResourcePath(), resource.getResourceId()));
         if (resourceFile.exists()) {
             throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
+        } else {
+            resourceFile.getParentFile().mkdirs();
         }
 
         if (resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
-            resourceCache.put(resource.getResourceName(), resource);
+            resourceCache.put(resource.getResourcePath(), resource);
         }
 
         FileOutputStream fos = null;
@@ -182,18 +184,18 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
 
             //if replication enabled, send resource metadata info to remote nodes
             if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
-                String filePath = getFileName(resource.getResourceName(), resource.getResourceId());
+                String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
                 createReplicationJob(ReplicationOperation.REPLICATE, filePath);
             }
         }
     }
 
     @Override
-    public synchronized void deleteResourceByName(String name) throws HyracksDataException {
-        File resourceFile = getLocalResourceFileByName(name);
+    public synchronized void deleteResourceByPath(String resourcePath) throws HyracksDataException {
+        File resourceFile = getLocalResourceFileByName(resourcePath);
         if (resourceFile.exists()) {
             resourceFile.delete();
-            resourceCache.invalidate(name);
+            resourceCache.invalidate(resourcePath);
 
             //if replication enabled, delete resource from remote replicas
             if (isReplicationEnabled && !resourceFile.getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
@@ -204,8 +206,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
         }
     }
 
-    private static File getLocalResourceFileByName(String resourceName) {
-        return new File(resourceName + File.separator + METADATA_FILE_NAME);
+    private static File getLocalResourceFileByName(String resourcePath) {
+        return new File(resourcePath + File.separator + METADATA_FILE_NAME);
     }
 
     public HashMap<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
@@ -220,25 +222,21 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
             }
 
             //load all local resources.
-            File[] dataverseFileList = storageRootDir.listFiles();
-            if (dataverseFileList != null) {
-                for (File dataverseFile : dataverseFileList) {
-                    if (dataverseFile.isDirectory()) {
-                        File[] indexFileList = dataverseFile.listFiles();
-                        if (indexFileList != null) {
-                            for (File indexFile : indexFileList) {
-                                if (indexFile.isDirectory()) {
-                                    File[] ioDevicesList = indexFile.listFiles();
-                                    if (ioDevicesList != null) {
-                                        for (File ioDeviceFile : ioDevicesList) {
-                                            if (ioDeviceFile.isDirectory()) {
-                                                File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
-                                                if (metadataFiles != null) {
-                                                    for (File metadataFile : metadataFiles) {
-                                                        LocalResource localResource = readLocalResource(metadataFile);
-                                                        resourcesMap.put(localResource.getResourceId(), localResource);
-                                                    }
-                                                }
+            File[] partitions = storageRootDir.listFiles();
+            for (File partition : partitions) {
+                File[] dataverseFileList = partition.listFiles();
+                if (dataverseFileList != null) {
+                    for (File dataverseFile : dataverseFileList) {
+                        if (dataverseFile.isDirectory()) {
+                            File[] indexFileList = dataverseFile.listFiles();
+                            if (indexFileList != null) {
+                                for (File indexFile : indexFileList) {
+                                    if (indexFile.isDirectory()) {
+                                        File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+                                        if (metadataFiles != null) {
+                                            for (File metadataFile : metadataFiles) {
+                                                LocalResource localResource = readLocalResource(metadataFile);
+                                                resourcesMap.put(localResource.getResourceId(), localResource);
                                             }
                                         }
                                     }
@@ -263,27 +261,23 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
                 continue;
             }
 
-            //traverse all local resources.
-            File[] dataverseFileList = storageRootDir.listFiles();
-            if (dataverseFileList != null) {
-                for (File dataverseFile : dataverseFileList) {
-                    if (dataverseFile.isDirectory()) {
-                        File[] indexFileList = dataverseFile.listFiles();
-                        if (indexFileList != null) {
-                            for (File indexFile : indexFileList) {
-                                if (indexFile.isDirectory()) {
-                                    File[] ioDevicesList = indexFile.listFiles();
-                                    if (ioDevicesList != null) {
-                                        for (File ioDeviceFile : ioDevicesList) {
-                                            if (ioDeviceFile.isDirectory()) {
-                                                File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
-                                                if (metadataFiles != null) {
-                                                    for (File metadataFile : metadataFiles) {
-                                                        LocalResource localResource = readLocalResource(metadataFile);
-                                                        maxResourceId = Math.max(maxResourceId,
-                                                                localResource.getResourceId());
-                                                    }
-                                                }
+            //load all local resources.
+            File[] partitions = storageRootDir.listFiles();
+            for (File partition : partitions) {
+                //traverse all local resources.
+                File[] dataverseFileList = partition.listFiles();
+                if (dataverseFileList != null) {
+                    for (File dataverseFile : dataverseFileList) {
+                        if (dataverseFile.isDirectory()) {
+                            File[] indexFileList = dataverseFile.listFiles();
+                            if (indexFileList != null) {
+                                for (File indexFile : indexFileList) {
+                                    if (indexFile.isDirectory()) {
+                                        File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+                                        if (metadataFiles != null) {
+                                            for (File metadataFile : metadataFiles) {
+                                                LocalResource localResource = readLocalResource(metadataFile);
+                                                maxResourceId = Math.max(maxResourceId, localResource.getResourceId());
                                             }
                                         }
                                     }
@@ -305,8 +299,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
             if (!baseDir.endsWith(System.getProperty("file.separator"))) {
                 baseDir += System.getProperty("file.separator");
             }
-            String fileName = new String(baseDir + METADATA_FILE_NAME);
-            return fileName;
+            return new String(baseDir + METADATA_FILE_NAME);
         }
     }
 
@@ -376,6 +369,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
 
     /**
      * Deletes physical files of all data verses.
+     *
      * @param deleteStorageMetadata
      * @throws IOException
      */