You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 16:59:42 UTC

[07/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master' into hyracks-merge2

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
index 581d01c,0000000..0ac4f56
mode 100644,000000..100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
@@@ -1,277 -1,0 +1,282 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.asterix.metadata.utils;
 +
 +import java.io.DataOutput;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.asterix.builders.IARecordBuilder;
 +import org.apache.asterix.builders.RecordBuilder;
 +import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 +import org.apache.asterix.common.config.MetadataConstants;
 +import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.external.indexing.IndexingConstants;
 +import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 +import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
 +import org.apache.asterix.metadata.MetadataException;
 +import org.apache.asterix.metadata.MetadataManager;
 +import org.apache.asterix.metadata.MetadataTransactionContext;
++import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 +import org.apache.asterix.metadata.entities.CompactionPolicy;
 +import org.apache.asterix.metadata.entities.Dataset;
 +import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 +import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 +import org.apache.asterix.om.base.AMutableString;
 +import org.apache.asterix.om.base.AString;
 +import org.apache.asterix.om.types.ARecordType;
 +import org.apache.asterix.om.types.BuiltinType;
 +import org.apache.asterix.om.types.IAType;
 +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 +import org.apache.hyracks.algebricks.common.utils.Pair;
 +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 +import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
 +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 +import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 +import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 +
 +public class DatasetUtils {
 +    public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, ARecordType itemType,
 +            IBinaryComparatorFactoryProvider comparatorFactoryProvider) throws AlgebricksException {
 +        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
 +        IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()];
 +        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
 +            // Get comparators for RID fields.
 +            for (int i = 0; i < partitioningKeys.size(); i++) {
 +                try {
 +                    bcfs[i] = IndexingConstants.getComparatorFactory(i);
 +                } catch (AsterixException e) {
 +                    throw new AlgebricksException(e);
 +                }
 +            }
 +        } else {
 +            for (int i = 0; i < partitioningKeys.size(); i++) {
 +                IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
 +                bcfs[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
 +            }
 +        }
 +        return bcfs;
 +    }
 +
 +    public static int[] createBloomFilterKeyFields(Dataset dataset) throws AlgebricksException {
 +        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
 +            throw new AlgebricksException("not implemented");
 +        }
 +        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
 +        int[] bloomFilterKeyFields = new int[partitioningKeys.size()];
 +        for (int i = 0; i < partitioningKeys.size(); ++i) {
 +            bloomFilterKeyFields[i] = i;
 +        }
 +        return bloomFilterKeyFields;
 +    }
 +
 +    public static IBinaryHashFunctionFactory[] computeKeysBinaryHashFunFactories(Dataset dataset, ARecordType itemType,
 +            IBinaryHashFunctionFactoryProvider hashFunProvider) throws AlgebricksException {
 +        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
 +            throw new AlgebricksException("not implemented");
 +        }
 +        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
 +        IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[partitioningKeys.size()];
 +        for (int i = 0; i < partitioningKeys.size(); i++) {
 +            IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
 +            bhffs[i] = hashFunProvider.getBinaryHashFunctionFactory(keyType);
 +        }
 +        return bhffs;
 +    }
 +
-     public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, ARecordType itemType)
-             throws AlgebricksException {
-         return computeTupleTypeTraits(dataset, itemType, null);
-     }
- 
 +    public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, ARecordType itemType, ARecordType metaItemType)
 +            throws AlgebricksException {
 +        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
 +            throw new AlgebricksException("not implemented");
 +        }
 +        List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
 +        int numKeys = partitioningKeys.size();
 +        ITypeTraits[] typeTraits;
 +        if (metaItemType != null) {
 +            typeTraits = new ITypeTraits[numKeys + 2];
 +            List<Integer> indicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
 +            typeTraits[numKeys + 1] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(metaItemType);
 +            for (int i = 0; i < numKeys; i++) {
 +                IAType keyType;
 +                if (indicator.get(i) == 0) {
 +                    keyType = itemType.getSubFieldType(partitioningKeys.get(i));
 +                } else {
 +                    keyType = metaItemType.getSubFieldType(partitioningKeys.get(i));
 +                }
 +                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
 +            }
 +        } else {
 +            typeTraits = new ITypeTraits[numKeys + 1];
 +            for (int i = 0; i < numKeys; i++) {
 +                IAType keyType;
 +                keyType = itemType.getSubFieldType(partitioningKeys.get(i));
 +                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
 +            }
 +        }
 +        typeTraits[numKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
 +        return typeTraits;
 +    }
 +
 +    public static List<List<String>> getPartitioningKeys(Dataset dataset) {
 +        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
 +            return IndexingConstants.getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
 +        }
 +        return ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
 +    }
 +
 +    public static List<String> getFilterField(Dataset dataset) {
 +        return (((InternalDatasetDetails) dataset.getDatasetDetails())).getFilterField();
 +    }
 +
 +    public static IBinaryComparatorFactory[] computeFilterBinaryComparatorFactories(Dataset dataset,
 +            ARecordType itemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
-                     throws AlgebricksException {
++            throws AlgebricksException {
 +        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
 +            return null;
 +        }
 +        List<String> filterField = getFilterField(dataset);
 +        if (filterField == null) {
 +            return null;
 +        }
 +        IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[1];
 +        IAType type = itemType.getSubFieldType(filterField);
 +        bcfs[0] = comparatorFactoryProvider.getBinaryComparatorFactory(type, true);
 +        return bcfs;
 +    }
 +
 +    public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, ARecordType itemType)
 +            throws AlgebricksException {
 +        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
 +            return null;
 +        }
 +        List<String> filterField = getFilterField(dataset);
 +        if (filterField == null) {
 +            return null;
 +        }
 +        ITypeTraits[] typeTraits = new ITypeTraits[1];
 +        IAType type = itemType.getSubFieldType(filterField);
 +        typeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(type);
 +        return typeTraits;
 +    }
 +
 +    public static int[] createFilterFields(Dataset dataset) throws AlgebricksException {
 +        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
 +            return null;
 +        }
 +
 +        List<String> filterField = getFilterField(dataset);
 +        if (filterField == null) {
 +            return null;
 +        }
 +        List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
 +        int numKeys = partitioningKeys.size();
 +
 +        int[] filterFields = new int[1];
 +        filterFields[0] = numKeys + 1;
 +        return filterFields;
 +    }
 +
 +    public static int[] createBTreeFieldsWhenThereisAFilter(Dataset dataset) throws AlgebricksException {
 +        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
 +            return null;
 +        }
 +
 +        List<String> filterField = getFilterField(dataset);
 +        if (filterField == null) {
 +            return null;
 +        }
 +
 +        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
 +        int valueFields = dataset.hasMetaPart() ? 2 : 1;
 +        int[] btreeFields = new int[partitioningKeys.size() + valueFields];
 +        for (int i = 0; i < btreeFields.length; ++i) {
 +            btreeFields[i] = i;
 +        }
 +        return btreeFields;
 +    }
 +
 +    public static int getPositionOfPartitioningKeyField(Dataset dataset, String fieldExpr) {
 +        List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
 +        for (int i = 0; i < partitioningKeys.size(); i++) {
 +            if ((partitioningKeys.get(i).size() == 1) && partitioningKeys.get(i).get(0).equals(fieldExpr)) {
 +                return i;
 +            }
 +        }
 +        return -1;
 +    }
 +
 +    public static Pair<ILSMMergePolicyFactory, Map<String, String>> getMergePolicyFactory(Dataset dataset,
 +            MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException {
 +        String policyName = dataset.getCompactionPolicy();
 +        CompactionPolicy compactionPolicy = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
 +                MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
 +        String compactionPolicyFactoryClassName = compactionPolicy.getClassName();
 +        ILSMMergePolicyFactory mergePolicyFactory;
 +        try {
 +            mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance();
 +            if (mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
 +                ((CorrelatedPrefixMergePolicyFactory) mergePolicyFactory).setDatasetID(dataset.getDatasetId());
 +            }
 +        } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
 +            throw new AlgebricksException(e);
 +        }
 +        Map<String, String> properties = dataset.getCompactionPolicyProperties();
 +        return new Pair<ILSMMergePolicyFactory, Map<String, String>>(mergePolicyFactory, properties);
 +    }
 +
 +    @SuppressWarnings("unchecked")
 +    public static void writePropertyTypeRecord(String name, String value, DataOutput out, ARecordType recordType)
 +            throws HyracksDataException {
 +        IARecordBuilder propertyRecordBuilder = new RecordBuilder();
 +        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
 +        propertyRecordBuilder.reset(recordType);
 +        AMutableString aString = new AMutableString("");
 +        ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
 +                .getSerializerDeserializer(BuiltinType.ASTRING);
 +
 +        // write field 0
 +        fieldValue.reset();
 +        aString.setValue(name);
 +        stringSerde.serialize(aString, fieldValue.getDataOutput());
 +        propertyRecordBuilder.addField(0, fieldValue);
 +
 +        // write field 1
 +        fieldValue.reset();
 +        aString.setValue(value);
 +        stringSerde.serialize(aString, fieldValue.getDataOutput());
 +        propertyRecordBuilder.addField(1, fieldValue);
 +
 +        propertyRecordBuilder.write(out, true);
 +    }
++
++    public static ARecordType getMetaType(AqlMetadataProvider metadataProvider, Dataset dataset)
++            throws AlgebricksException {
++        if (dataset.hasMetaPart()) {
++            return (ARecordType) metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
++                    dataset.getMetaItemTypeName());
++        }
++        return null;
++    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
index 51c3802,0000000..0e9aa0c
mode 100644,000000..100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
@@@ -1,59 -1,0 +1,63 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.om.util;
 +
 +import java.net.InetAddress;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
++import org.apache.hyracks.control.cc.ClusterControllerService;
++
 +/**
 + * Utility class for obtaining information on the set of Hyracks NodeController
 + * processes that are running on a given host.
 + */
 +public class AsterixRuntimeUtil {
 +
 +    public static Set<String> getNodeControllersOnIP(InetAddress ipAddress) throws Exception {
 +        Map<InetAddress, Set<String>> nodeControllerInfo = getNodeControllerMap();
 +        Set<String> nodeControllersAtLocation = nodeControllerInfo.get(ipAddress);
 +        return nodeControllersAtLocation;
 +    }
 +
 +    public static List<String> getAllNodeControllers() throws Exception {
 +        Collection<Set<String>> nodeControllersCollection = getNodeControllerMap().values();
 +        List<String> nodeControllers = new ArrayList<String>();
 +        for (Set<String> ncCollection : nodeControllersCollection) {
 +            nodeControllers.addAll(ncCollection);
 +        }
 +        return nodeControllers;
 +    }
 +
 +    public static Map<InetAddress, Set<String>> getNodeControllerMap() throws Exception {
 +        Map<InetAddress, Set<String>> map = new HashMap<InetAddress, Set<String>>();
 +        AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
 +        return map;
 +    }
 +
 +    public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) throws Exception {
-         AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
++        ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.getInstance()
++                .getCCApplicationContext().getControllerService();
++        map.putAll(ccs.getIpAddressNodeNameMap());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index 2bf5fa3,0000000..a349e51
mode 100644,000000..100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@@ -1,154 -1,0 +1,154 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.replication.storage;
 +
 +import java.io.DataInput;
 +import java.io.DataOutputStream;
 +import java.io.File;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +
 +import org.apache.asterix.common.utils.StoragePathUtil;
 +import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 +
 +public class LSMIndexFileProperties {
 +
 +    private String fileName;
 +    private long fileSize;
 +    private String nodeId;
 +    private String dataverse;
 +    private String idxName;
 +    private boolean lsmComponentFile;
 +    private String filePath;
 +    private boolean requiresAck = false;
 +    private long LSNByteOffset;
 +    private int partition;
 +
 +    public LSMIndexFileProperties() {
 +    }
 +
 +    public LSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
 +            long LSNByteOffset, boolean requiresAck) {
 +        initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
 +    }
 +
 +    public LSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
 +        initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false,
 +                IMetaDataPageManager.INVALID_LSN_OFFSET, false);
 +    }
 +
 +    public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset,
 +            boolean requiresAck) {
 +        this.filePath = filePath;
 +        this.fileSize = fileSize;
 +        this.nodeId = nodeId;
 +        this.lsmComponentFile = lsmComponentFile;
 +        this.LSNByteOffset = LSNByteOffset;
 +        this.requiresAck = requiresAck;
 +    }
 +
 +    public void splitFileName() {
 +        String[] tokens = filePath.split(File.separator);
 +        int arraySize = tokens.length;
 +        this.fileName = tokens[arraySize - 1];
 +        this.idxName = tokens[arraySize - 2];
 +        this.dataverse = tokens[arraySize - 3];
-         this.partition = StoragePathUtil.getPartitonNumFromName(tokens[arraySize - 4]);
++        this.partition = StoragePathUtil.getPartitionNumFromName(tokens[arraySize - 4]);
 +    }
 +
 +    public void serialize(OutputStream out) throws IOException {
 +        DataOutputStream dos = new DataOutputStream(out);
 +        dos.writeUTF(nodeId);
 +        dos.writeUTF(filePath);
 +        dos.writeLong(fileSize);
 +        dos.writeBoolean(lsmComponentFile);
 +        dos.writeLong(LSNByteOffset);
 +        dos.writeBoolean(requiresAck);
 +    }
 +
 +    public static LSMIndexFileProperties create(DataInput input) throws IOException {
 +        String nodeId = input.readUTF();
 +        String filePath = input.readUTF();
 +        long fileSize = input.readLong();
 +        boolean lsmComponentFile = input.readBoolean();
 +        long LSNByteOffset = input.readLong();
 +        boolean requiresAck = input.readBoolean();
 +        LSMIndexFileProperties fileProp = new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile,
 +                LSNByteOffset, requiresAck);
 +        return fileProp;
 +    }
 +
 +    public String getFilePath() {
 +        return filePath;
 +    }
 +
 +    public long getFileSize() {
 +        return fileSize;
 +    }
 +
 +    public String getFileName() {
 +        return fileName;
 +    }
 +
 +    public String getNodeId() {
 +        return nodeId;
 +    }
 +
 +    public String getDataverse() {
 +        return dataverse;
 +    }
 +
 +    public void setDataverse(String dataverse) {
 +        this.dataverse = dataverse;
 +    }
 +
 +    public String getIdxName() {
 +        return idxName;
 +    }
 +
 +    public boolean isLSMComponentFile() {
 +        return lsmComponentFile;
 +    }
 +
 +    public boolean requiresAck() {
 +        return requiresAck;
 +    }
 +
 +    @Override
 +    public String toString() {
 +        StringBuilder sb = new StringBuilder();
 +        sb.append("File Name: " + fileName + "  ");
 +        sb.append("File Size: " + fileSize + "  ");
 +        sb.append("Node ID: " + nodeId + "  ");
 +        sb.append("Partition: " + partition + "  ");
 +        sb.append("IDX Name: " + idxName + "  ");
 +        sb.append("isLSMComponentFile : " + lsmComponentFile + "  ");
 +        sb.append("Dataverse: " + dataverse);
 +        sb.append("LSN Byte Offset: " + LSNByteOffset);
 +        return sb.toString();
 +    }
 +
 +    public long getLSNByteOffset() {
 +        return LSNByteOffset;
 +    }
 +
 +    public int getPartition() {
 +        return partition;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-runtime/pom.xml
----------------------------------------------------------------------
diff --cc asterixdb/asterix-runtime/pom.xml
index 6414139,0000000..1914336
mode 100644,000000..100644
--- a/asterixdb/asterix-runtime/pom.xml
+++ b/asterixdb/asterix-runtime/pom.xml
@@@ -1,82 -1,0 +1,72 @@@
 +<!--
 + ! Licensed to the Apache Software Foundation (ASF) under one
 + ! or more contributor license agreements.  See the NOTICE file
 + ! distributed with this work for additional information
 + ! regarding copyright ownership.  The ASF licenses this file
 + ! to you under the Apache License, Version 2.0 (the
 + ! "License"); you may not use this file except in compliance
 + ! with the License.  You may obtain a copy of the License at
 + !
 + !   http://www.apache.org/licenses/LICENSE-2.0
 + !
 + ! Unless required by applicable law or agreed to in writing,
 + ! software distributed under the License is distributed on an
 + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + ! KIND, either express or implied.  See the License for the
 + ! specific language governing permissions and limitations
 + ! under the License.
 + !-->
 +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 +    <modelVersion>4.0.0</modelVersion>
 +    <parent>
 +        <artifactId>apache-asterixdb</artifactId>
 +        <groupId>org.apache.asterix</groupId>
 +        <version>0.8.9-SNAPSHOT</version>
 +    </parent>
 +    <artifactId>asterix-runtime</artifactId>
 +    <properties>
 +        <appendedResourcesDirectory>${basedir}/../src/main/appended-resources</appendedResourcesDirectory>
 +    </properties>
 +    <licenses>
 +        <license>
 +            <name>Apache License, Version 2.0</name>
 +            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
 +            <distribution>repo</distribution>
 +            <comments>A business-friendly OSS license</comments>
 +        </license>
 +    </licenses>
 +    <dependencies>
 +        <dependency>
 +            <groupId>org.apache.asterix</groupId>
 +            <artifactId>asterix-om</artifactId>
 +            <version>0.8.9-SNAPSHOT</version>
 +            <scope>compile</scope>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.asterix</groupId>
 +            <artifactId>asterix-fuzzyjoin</artifactId>
 +            <version>0.8.9-SNAPSHOT</version>
 +            <scope>compile</scope>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.hyracks</groupId>
 +            <artifactId>hyracks-storage-am-btree</artifactId>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.asterix</groupId>
 +            <artifactId>asterix-transactions</artifactId>
 +            <version>0.8.9-SNAPSHOT</version>
 +            <scope>compile</scope>
 +        </dependency>
 +        <dependency>
-             <groupId>org.twitter4j</groupId>
-             <artifactId>twitter4j-core</artifactId>
-             <version>[4.0,)</version>
-         </dependency>
-         <dependency>
-             <groupId>org.twitter4j</groupId>
-             <artifactId>twitter4j-stream</artifactId>
-             <version>[4.0,)</version>
-         </dependency>
-         <dependency>
 +            <groupId>org.apache.hadoop</groupId>
 +            <artifactId>hadoop-client</artifactId>
 +            <type>jar</type>
 +            <scope>compile</scope>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.hyracks</groupId>
 +            <artifactId>hyracks-api</artifactId>
 +        </dependency>
 +    </dependencies>
 +</project>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 3a1e729,0000000..561b144
mode 100644,000000..100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@@ -1,467 -1,0 +1,478 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.transaction.management.resource;
 +
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.FileOutputStream;
 +import java.io.FilenameFilter;
 +import java.io.IOException;
 +import java.io.ObjectInputStream;
 +import java.io.ObjectOutputStream;
 +import java.nio.file.Files;
 +import java.nio.file.Path;
 +import java.nio.file.Paths;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.logging.Level;
 +import java.util.logging.Logger;
 +
 +import org.apache.asterix.common.cluster.ClusterPartition;
 +import org.apache.asterix.common.config.AsterixMetadataProperties;
 +import org.apache.asterix.common.replication.AsterixReplicationJob;
 +import org.apache.asterix.common.replication.IReplicationManager;
 +import org.apache.asterix.common.utils.StoragePathUtil;
 +import org.apache.commons.io.FileUtils;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.api.io.IODeviceHandle;
 +import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 +import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
 +import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
 +import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 +import org.apache.hyracks.storage.common.file.LocalResource;
 +
 +import com.google.common.cache.Cache;
 +import com.google.common.cache.CacheBuilder;
 +
 +public class PersistentLocalResourceRepository implements ILocalResourceRepository {
 +
 +    private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName());
 +    private final String[] mountPoints;
 +    private static final String STORAGE_METADATA_DIRECTORY = "asterix_root_metadata";
 +    private static final String STORAGE_METADATA_FILE_NAME_PREFIX = ".asterix_root_metadata";
 +    private static final long STORAGE_LOCAL_RESOURCE_ID = -4321;
 +    public static final String METADATA_FILE_NAME = ".metadata";
 +    private final Cache<String, LocalResource> resourceCache;
 +    private final String nodeId;
 +    private static final int MAX_CACHED_RESOURCES = 1000;
 +    private IReplicationManager replicationManager;
 +    private boolean isReplicationEnabled = false;
 +    private Set<String> filesToBeReplicated;
 +    private final SortedMap<Integer, ClusterPartition> clusterPartitions;
 +    private final Set<Integer> nodeOriginalPartitions;
 +    private final Set<Integer> nodeActivePartitions;
 +    private Set<Integer> nodeInactivePartitions;
 +
 +    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId,
 +            AsterixMetadataProperties metadataProperties) throws HyracksDataException {
 +        mountPoints = new String[devices.size()];
 +        this.nodeId = nodeId;
 +        this.clusterPartitions = metadataProperties.getClusterPartitions();
 +        for (int i = 0; i < mountPoints.length; i++) {
 +            String mountPoint = devices.get(i).getPath().getPath();
 +            File mountPointDir = new File(mountPoint);
 +            if (!mountPointDir.exists()) {
 +                throw new HyracksDataException(mountPointDir.getAbsolutePath() + " doesn't exist.");
 +            }
 +            if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
 +                mountPoints[i] = new String(mountPoint + System.getProperty("file.separator"));
 +            } else {
 +                mountPoints[i] = new String(mountPoint);
 +            }
 +        }
 +        resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
 +
 +        ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
 +        //initially the node active partitions are the same as the original partitions
 +        nodeOriginalPartitions = new HashSet<>(nodePartitions.length);
 +        nodeActivePartitions = new HashSet<>(nodePartitions.length);
 +        for (ClusterPartition partition : nodePartitions) {
 +            nodeOriginalPartitions.add(partition.getPartitionId());
 +            nodeActivePartitions.add(partition.getPartitionId());
 +        }
 +    }
 +
 +    private static String getStorageMetadataDirPath(String mountPoint, String nodeId, int ioDeviceId) {
 +        return mountPoint + STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" + ioDeviceId;
 +    }
 +
 +    private static File getStorageMetadataBaseDir(File storageMetadataFile) {
 +        //STORAGE_METADATA_DIRECTORY / Node Id / STORAGE_METADATA_FILE_NAME_PREFIX
 +        return storageMetadataFile.getParentFile().getParentFile();
 +    }
 +
 +    public void initializeNewUniverse(String storageRootDirName) throws HyracksDataException {
 +        if (LOGGER.isLoggable(Level.INFO)) {
 +            LOGGER.info("Initializing local resource repository ... ");
 +        }
 +
 +        //create storage metadata file (This file is used to locate the root storage directory after instance restarts).
 +        //TODO with the existing cluster configuration file being static and distributed on all NCs, we can find out the storage root
 +        //directory without looking at this file. This file could potentially store more information, otherwise no need to keep it.
 +        for (int i = 0; i < mountPoints.length; i++) {
 +            File storageMetadataFile = getStorageMetadataFile(mountPoints[i], nodeId, i);
 +            File storageMetadataDir = storageMetadataFile.getParentFile();
 +            //make dirs for the storage metadata file
 +            boolean success = storageMetadataDir.mkdirs();
 +            if (!success) {
 +                throw new IllegalStateException(
 +                        "Unable to create storage metadata directory of PersistentLocalResourceRepository in "
 +                                + storageMetadataDir.getAbsolutePath() + " or directory already exists");
 +            }
 +
 +            LOGGER.log(Level.INFO,
 +                    "created the root-metadata-file's directory: " + storageMetadataDir.getAbsolutePath());
 +
 +            String storageRootDirPath;
 +            if (storageRootDirName.startsWith(System.getProperty("file.separator"))) {
 +                storageRootDirPath = new String(
 +                        mountPoints[i] + storageRootDirName.substring(System.getProperty("file.separator").length()));
 +            } else {
 +                storageRootDirPath = new String(mountPoints[i] + storageRootDirName);
 +            }
 +
 +            LocalResource rootLocalResource = new LocalResource(STORAGE_LOCAL_RESOURCE_ID,
 +                    storageMetadataFile.getAbsolutePath(), 0, storageMetadataFile.getAbsolutePath(), 0,
 +                    storageRootDirPath);
 +            insert(rootLocalResource);
 +            LOGGER.log(Level.INFO, "created the root-metadata-file: " + storageMetadataFile.getAbsolutePath());
 +        }
 +        LOGGER.log(Level.INFO, "Completed the initialization of the local resource repository");
 +    }
 +
 +    @Override
 +    public LocalResource getResourceByPath(String path) throws HyracksDataException {
 +        LocalResource resource = resourceCache.getIfPresent(path);
 +        if (resource == null) {
 +            File resourceFile = getLocalResourceFileByName(path);
 +            if (resourceFile.exists()) {
 +                resource = readLocalResource(resourceFile);
 +                resourceCache.put(path, resource);
 +            }
 +        }
 +        return resource;
 +    }
 +
 +    @Override
 +    public synchronized void insert(LocalResource resource) throws HyracksDataException {
 +        File resourceFile = new File(getFileName(resource.getResourcePath(), resource.getResourceId()));
 +        if (resourceFile.exists()) {
 +            throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
 +        } else {
 +            resourceFile.getParentFile().mkdirs();
 +        }
 +
 +        if (resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
 +            resourceCache.put(resource.getResourcePath(), resource);
 +        }
 +
 +        try (FileOutputStream fos = new FileOutputStream(resourceFile);
 +                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
 +            oosToFos.writeObject(resource);
 +            oosToFos.flush();
 +        } catch (IOException e) {
 +            throw new HyracksDataException(e);
 +        }
 +
 +        //if replication enabled, send resource metadata info to remote nodes
 +        if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
 +            String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
 +            createReplicationJob(ReplicationOperation.REPLICATE, filePath);
 +        }
 +    }
 +
 +    @Override
 +    public synchronized void deleteResourceByPath(String resourcePath) throws HyracksDataException {
 +        File resourceFile = getLocalResourceFileByName(resourcePath);
 +        if (resourceFile.exists()) {
 +            resourceFile.delete();
 +            resourceCache.invalidate(resourcePath);
 +
 +            //if replication enabled, delete resource from remote replicas
 +            if (isReplicationEnabled && !resourceFile.getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
 +                createReplicationJob(ReplicationOperation.DELETE, resourceFile.getAbsolutePath());
 +            }
 +        } else {
 +            throw new HyracksDataException("Resource doesn't exist");
 +        }
 +    }
 +
 +    private static File getLocalResourceFileByName(String resourcePath) {
 +        return new File(resourcePath + File.separator + METADATA_FILE_NAME);
 +    }
 +
 +    public HashMap<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
 +        //TODO During recovery, the memory usage currently is proportional to the number of resources available.
 +        //This could be fixed by traversing all resources on disk until the required resource is found.
 +        HashMap<Long, LocalResource> resourcesMap = new HashMap<Long, LocalResource>();
 +
 +        for (int i = 0; i < mountPoints.length; i++) {
 +            File storageRootDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
 +            if (storageRootDir == null) {
 +                continue;
 +            }
 +
 +            //load all local resources.
 +            File[] partitions = storageRootDir.listFiles();
 +            for (File partition : partitions) {
 +                File[] dataverseFileList = partition.listFiles();
 +                if (dataverseFileList != null) {
 +                    for (File dataverseFile : dataverseFileList) {
 +                        if (dataverseFile.isDirectory()) {
 +                            File[] indexFileList = dataverseFile.listFiles();
 +                            if (indexFileList != null) {
 +                                for (File indexFile : indexFileList) {
 +                                    if (indexFile.isDirectory()) {
 +                                        File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
 +                                        if (metadataFiles != null) {
 +                                            for (File metadataFile : metadataFiles) {
 +                                                LocalResource localResource = readLocalResource(metadataFile);
 +                                                resourcesMap.put(localResource.getResourceId(), localResource);
 +                                            }
 +                                        }
 +                                    }
 +                                }
 +                            }
 +                        }
 +                    }
 +                }
 +            }
 +        }
 +
 +        return resourcesMap;
 +    }
 +
 +    @Override
 +    public long getMaxResourceID() throws HyracksDataException {
 +        long maxResourceId = 0;
 +
 +        for (int i = 0; i < mountPoints.length; i++) {
 +            File storageRootDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
 +            if (storageRootDir == null) {
 +                continue;
 +            }
 +
 +            //load all local resources.
 +            File[] partitions = storageRootDir.listFiles();
 +            for (File partition : partitions) {
 +                //traverse all local resources.
 +                File[] dataverseFileList = partition.listFiles();
 +                if (dataverseFileList != null) {
 +                    for (File dataverseFile : dataverseFileList) {
 +                        if (dataverseFile.isDirectory()) {
 +                            File[] indexFileList = dataverseFile.listFiles();
 +                            if (indexFileList != null) {
 +                                for (File indexFile : indexFileList) {
 +                                    if (indexFile.isDirectory()) {
 +                                        File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
 +                                        if (metadataFiles != null) {
 +                                            for (File metadataFile : metadataFiles) {
 +                                                LocalResource localResource = readLocalResource(metadataFile);
 +                                                maxResourceId = Math.max(maxResourceId, localResource.getResourceId());
 +                                            }
 +                                        }
 +                                    }
 +                                }
 +                            }
 +                        }
 +                    }
 +                }
 +            }
 +        }
 +
 +        return maxResourceId;
 +    }
 +
 +    private static String getFileName(String baseDir, long resourceId) {
 +        if (resourceId == STORAGE_LOCAL_RESOURCE_ID) {
 +            return baseDir;
 +        } else {
 +            if (!baseDir.endsWith(System.getProperty("file.separator"))) {
 +                baseDir += System.getProperty("file.separator");
 +            }
 +            return new String(baseDir + METADATA_FILE_NAME);
 +        }
 +    }
 +
 +    public static LocalResource readLocalResource(File file) throws HyracksDataException {
 +        try (FileInputStream fis = new FileInputStream(file);
 +                ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
 +            LocalResource resource = (LocalResource) oisFromFis.readObject();
 +            return resource;
 +        } catch (Exception e) {
 +            throw new HyracksDataException(e);
 +        }
 +    }
 +
 +    private static final FilenameFilter METADATA_FILES_FILTER = new FilenameFilter() {
 +        @Override
 +        public boolean accept(File dir, String name) {
 +            if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
 +                return true;
 +            } else {
 +                return false;
 +            }
 +        }
 +    };
 +
 +    public void setReplicationManager(IReplicationManager replicationManager) {
 +        this.replicationManager = replicationManager;
 +        isReplicationEnabled = replicationManager.isReplicationEnabled();
 +
 +        if (isReplicationEnabled) {
 +            filesToBeReplicated = new HashSet<String>();
 +            nodeInactivePartitions = ConcurrentHashMap.newKeySet();
 +        }
 +    }
 +
 +    private void createReplicationJob(ReplicationOperation operation, String filePath) throws HyracksDataException {
-         filesToBeReplicated.clear();
-         filesToBeReplicated.add(filePath);
-         AsterixReplicationJob job = new AsterixReplicationJob(ReplicationJobType.METADATA, operation,
-                 ReplicationExecutionType.SYNC, filesToBeReplicated);
-         try {
-             replicationManager.submitJob(job);
-         } catch (IOException e) {
-             throw new HyracksDataException(e);
++        /**
++         * Durable resources path format:
++         * /partition/dataverse/idx/fileName
++         * Temporary resources path format:
++         * /partition/TEMP_DATASETS_STORAGE_FOLDER/dataverse/idx/fileName
++         */
++        String[] fileNameTokens = filePath.split(File.separator);
++        String partitionDir = fileNameTokens[fileNameTokens.length - 4];
++        //exclude temporary datasets resources
++        if (!partitionDir.equals(StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER)) {
++            filesToBeReplicated.clear();
++            filesToBeReplicated.add(filePath);
++            AsterixReplicationJob job = new AsterixReplicationJob(ReplicationJobType.METADATA, operation,
++                    ReplicationExecutionType.SYNC, filesToBeReplicated);
++            try {
++                replicationManager.submitJob(job);
++            } catch (IOException e) {
++                throw new HyracksDataException(e);
++            }
 +        }
 +    }
 +
 +    public String[] getStorageMountingPoints() {
 +        return mountPoints;
 +    }
 +
 +    /**
 +     * Deletes physical files of all data verses.
 +     *
 +     * @param deleteStorageMetadata
 +     * @throws IOException
 +     */
 +    public void deleteStorageData(boolean deleteStorageMetadata) throws IOException {
 +        for (int i = 0; i < mountPoints.length; i++) {
 +            File storageDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
 +            if (storageDir != null) {
 +                if (storageDir.isDirectory()) {
 +                    FileUtils.deleteDirectory(storageDir);
 +                }
 +            }
 +
 +            if (deleteStorageMetadata) {
 +                //delete the metadata root directory
 +                File storageMetadataFile = getStorageMetadataFile(mountPoints[i], nodeId, i);
 +                File storageMetadataDir = getStorageMetadataBaseDir(storageMetadataFile);
 +                if (storageMetadataDir.exists() && storageMetadataDir.isDirectory()) {
 +                    FileUtils.deleteDirectory(storageMetadataDir);
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * @param mountPoint
 +     * @param nodeId
 +     * @param ioDeviceId
 +     * @return A file reference to the storage metadata file.
 +     */
 +    private static File getStorageMetadataFile(String mountPoint, String nodeId, int ioDeviceId) {
 +        String storageMetadataFileName = getStorageMetadataDirPath(mountPoint, nodeId, ioDeviceId) + File.separator
 +                + STORAGE_METADATA_FILE_NAME_PREFIX;
 +        File storageMetadataFile = new File(storageMetadataFileName);
 +        return storageMetadataFile;
 +    }
 +
 +    /**
 +     * @param mountPoint
 +     * @param nodeId
 +     * @param ioDeviceId
 +     * @return A file reference to the storage root directory if exists, otherwise null.
 +     * @throws HyracksDataException
 +     */
 +    public static File getStorageRootDirectoryIfExists(String mountPoint, String nodeId, int ioDeviceId)
 +            throws HyracksDataException {
 +        File storageRootDir = null;
 +        File storageMetadataFile = getStorageMetadataFile(mountPoint, nodeId, ioDeviceId);
 +        if (storageMetadataFile.exists()) {
 +            LocalResource rootLocalResource = readLocalResource(storageMetadataFile);
 +            String storageRootDirPath = (String) rootLocalResource.getResourceObject();
 +            Path path = Paths.get(storageRootDirPath);
 +            if (Files.exists(path)) {
 +                storageRootDir = new File(storageRootDirPath);
 +            }
 +        }
 +        return storageRootDir;
 +    }
 +
 +    /**
 +     * @param partition
 +     * @return The partition local path on this NC.
 +     */
 +    public String getPartitionPath(int partition) {
 +        //currently each partition is replicated on the same IO device number on all NCs.
 +        return mountPoints[getIODeviceNum(partition)];
 +    }
 +
 +    public int getIODeviceNum(int partition) {
 +        return clusterPartitions.get(partition).getIODeviceNum();
 +    }
 +
 +    public Set<Integer> getActivePartitions() {
 +        return Collections.unmodifiableSet(nodeActivePartitions);
 +    }
 +
 +    public Set<Integer> getInactivePartitions() {
 +        return Collections.unmodifiableSet(nodeInactivePartitions);
 +    }
 +
 +    public Set<Integer> getNodeOrignalPartitions() {
 +        return Collections.unmodifiableSet(nodeOriginalPartitions);
 +    }
 +
 +    public synchronized void addActivePartition(int partitonId) {
 +        nodeActivePartitions.add(partitonId);
 +        nodeInactivePartitions.remove(partitonId);
 +    }
 +
 +    public synchronized void addInactivePartition(int partitonId) {
 +        nodeInactivePartitions.add(partitonId);
 +        nodeActivePartitions.remove(partitonId);
 +    }
 +
 +    /**
 +     * @param resourceAbsolutePath
 +     * @return the resource relative path starting from the partition directory
 +     */
 +    public static String getResourceRelativePath(String resourceAbsolutePath) {
 +        String[] tokens = resourceAbsolutePath.split(File.separator);
-         //partiton/dataverse/idx/fileName
++        //partition/dataverse/idx/fileName
 +        return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
 +                + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
 +    }
 +
 +    public static int getResourcePartition(String resourceAbsolutePath) {
 +        String[] tokens = resourceAbsolutePath.split(File.separator);
-         //partiton/dataverse/idx/fileName
-         return StoragePathUtil.getPartitonNumFromName(tokens[tokens.length - 4]);
++        //partition/dataverse/idx/fileName
++        return StoragePathUtil.getPartitionNumFromName(tokens[tokens.length - 4]);
 +    }
 +}