You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/01/01 02:17:03 UTC
[2/3] incubator-asterixdb git commit: Divide Cluster into Unique
Partitions
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
index 0eacc15..e89a8db 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.indexing.operators;
import java.util.List;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.storage.am.common.api.IIndex;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
@@ -49,18 +50,15 @@ public class ExternalDatasetIndexesCommitOperatorDescriptor extends AbstractExte
@Override
protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
- System.err.println("performing the operation on "+ IndexFileNameUtil.prepareFileName(fileIndexInfo.getFileSplitProvider()
- .getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo.getFileSplitProvider()
- .getFileSplits()[partition].getIODeviceId()));
+ FileReference resourecePath = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
+ System.err.println("performing the operation on "+ resourecePath.getFile().getAbsolutePath());
// Get DataflowHelper
IIndexDataflowHelper indexHelper = indexDataflowHelperFactory.createIndexDataflowHelper(fileIndexInfo, ctx, partition);
// Get index
IIndex index = indexHelper.getIndexInstance();
// commit transaction
((ITwoPCIndex) index).commitTransaction();
- System.err.println("operation on "+ IndexFileNameUtil.prepareFileName(fileIndexInfo.getFileSplitProvider()
- .getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo.getFileSplitProvider()
- .getFileSplits()[partition].getIODeviceId()) + " Succeded");
+ System.err.println("operation on "+ resourecePath.getFile().getAbsolutePath() + " Succeded");
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
index 8e7a288..9bdfaa6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
@@ -49,9 +49,7 @@ public class ExternalDatasetIndexesRecoverOperatorDescriptor extends AbstractExt
@Override
protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
- FileReference file = new FileReference(new File(IndexFileNameUtil.prepareFileName(fileIndexInfo
- .getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo
- .getFileSplitProvider().getFileSplits()[partition].getIODeviceId())));
+ FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
fileManager.recoverTransaction();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 2254f6f..09c65c8 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -27,8 +27,6 @@ import java.util.Set;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
-import org.kohsuke.args4j.Option;
-
import org.apache.asterix.event.management.EventUtil;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.MasterNode;
@@ -37,6 +35,7 @@ import org.apache.asterix.event.service.AsterixEventServiceUtil;
import org.apache.asterix.installer.driver.InstallerDriver;
import org.apache.asterix.installer.schema.conf.Configuration;
import org.apache.asterix.installer.schema.conf.Zookeeper;
+import org.kohsuke.args4j.Option;
public class ValidateCommand extends AbstractCommand {
@@ -97,7 +96,7 @@ public class ValidateCommand extends AbstractCommand {
valid = false;
} else {
cluster = EventUtil.getCluster(clusterPath);
- validateClusterProperties(cluster);
+ valid = valid & validateClusterProperties(cluster);
Set<String> servers = new HashSet<String>();
Set<String> serverIds = new HashSet<String>();
@@ -106,7 +105,7 @@ public class ValidateCommand extends AbstractCommand {
MasterNode masterNode = cluster.getMasterNode();
Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
- masterNode.getLogDir(), null, null, null, null);
+ masterNode.getLogDir(), null, null, null);
ipAddresses.add(masterNode.getClusterIp());
valid = valid & validateNodeConfiguration(master, cluster);
@@ -158,7 +157,7 @@ public class ValidateCommand extends AbstractCommand {
return true;
}
- private void validateClusterProperties(Cluster cluster) {
+ private boolean validateClusterProperties(Cluster cluster) {
List<String> tempDirs = new ArrayList<String>();
if (cluster.getLogDir() != null && checkTemporaryPath(cluster.getLogDir())) {
tempDirs.add("Log directory: " + cluster.getLogDir());
@@ -176,6 +175,11 @@ public class ValidateCommand extends AbstractCommand {
LOGGER.warn(msg);
}
+ if (cluster.getStore() == null || cluster.getStore().length() == 0) {
+ LOGGER.fatal("store not defined at cluster" + ERROR);
+ return false;
+ }
+ return true;
}
private boolean validateNodeConfiguration(Node node, Cluster cluster) {
@@ -201,14 +205,6 @@ public class ValidateCommand extends AbstractCommand {
}
}
- if (node.getStore() == null || node.getStore().length() == 0) {
- if (!cluster.getMasterNode().getId().equals(node.getId())
- && (cluster.getStore() == null || cluster.getStore().length() == 0)) {
- valid = false;
- LOGGER.fatal("store not defined at cluster/node level for node: " + node.getId() + ERROR);
- }
- }
-
if (node.getIodevices() == null || node.getIodevices().length() == 0) {
if (!cluster.getMasterNode().getId().equals(node.getId())
&& (cluster.getIodevices() == null || cluster.getIodevices().length() == 0)) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java b/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
index 56da6ee..1ac60ba 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
@@ -48,14 +48,11 @@ public class InstallerUtil {
String storeDataSubDir = asterixInstanceName + File.separator + "data" + File.separator;
String[] storeDirs = null;
StringBuffer nodeDataStore = new StringBuffer();
- String storeDirValue = node.getStore();
+ String storeDirValue = cluster.getStore();
if (storeDirValue == null) {
- storeDirValue = cluster.getStore();
- if (storeDirValue == null) {
- throw new IllegalStateException(" Store not defined for node " + node.getId());
- }
- storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
+ throw new IllegalStateException(" Store not defined for node " + node.getId());
}
+ storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
storeDirs = storeDirValue.split(",");
for (String ns : storeDirs) {
@@ -66,8 +63,8 @@ public class InstallerUtil {
return nodeDataStore.toString();
}
- public static AsterixConfiguration getAsterixConfiguration(String asterixConf) throws FileNotFoundException,
- IOException, JAXBException {
+ public static AsterixConfiguration getAsterixConfiguration(String asterixConf)
+ throws FileNotFoundException, IOException, JAXBException {
if (asterixConf == null) {
asterixConf = InstallerDriver.getManagixHome() + File.separator + DEFAULT_ASTERIX_CONFIGURATION_PATH;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index f22b2f1..5317fc2 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -32,6 +32,7 @@ import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -62,8 +63,10 @@ import org.apache.asterix.metadata.entities.Node;
import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.metadata.feeds.AdapterIdentifier;
import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
@@ -108,7 +111,6 @@ public class MetadataBootstrap {
private static IIOManager ioManager;
private static String metadataNodeName;
- private static String metadataStore;
private static Set<String> nodeNames;
private static String outputDir;
@@ -147,9 +149,7 @@ public class MetadataBootstrap {
AsterixMetadataProperties metadataProperties = propertiesProvider.getMetadataProperties();
metadataNodeName = metadataProperties.getMetadataNodeName();
- metadataStore = metadataProperties.getMetadataStore();
nodeNames = metadataProperties.getNodeNames();
- // nodeStores = asterixProperity.getStores();
dataLifecycleManager = runtimeContext.getDatasetLifecycleManager();
localResourceRepository = runtimeContext.getLocalResourceRepository();
@@ -375,11 +375,14 @@ public class MetadataBootstrap {
private static void enlistMetadataDataset(IMetadataIndex index, boolean create, MetadataTransactionContext mdTxnCtx)
throws Exception {
- String filePath = ioManager.getIODevices().get(runtimeContext.getMetaDataIODeviceId()).getPath()
- + File.separator
- + IndexFileNameUtil.prepareFileName(metadataStore + File.separator + index.getFileNameRelativePath(),
- runtimeContext.getMetaDataIODeviceId());
- FileReference file = new FileReference(new File(filePath));
+ ClusterPartition metadataPartition = propertiesProvider.getMetadataProperties().getMetadataPartition();
+ int metadataDeviceId = metadataPartition.getIODeviceNum();
+ String metadataPartitionPath = SplitsAndConstraintsUtil.prepareStoragePartitionPath(
+ AsterixClusterProperties.INSTANCE.getStorageDirectoryName(),
+ metadataPartition.getPartitionId());
+ String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
+ FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
+
List<IVirtualBufferCache> virtualBufferCaches = runtimeContext
.getVirtualBufferCaches(index.getDatasetId().getId());
ITypeTraits[] typeTraits = index.getTypeTraits();
@@ -391,7 +394,7 @@ public class MetadataBootstrap {
? runtimeContext.getLSMBTreeOperationTracker(index.getDatasetId().getId())
: new BaseOperationTracker(index.getDatasetId().getId(),
dataLifecycleManager.getDatasetInfo(index.getDatasetId().getId()));
- final String path = file.getFile().getPath();
+ final String absolutePath = file.getFile().getPath();
if (create) {
lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
@@ -409,12 +412,13 @@ public class MetadataBootstrap {
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMBTreeResource);
ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
- localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0));
- dataLifecycleManager.register(path, lsmBtree);
+ localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, resourceName,
+ metadataPartition.getPartitionId(), absolutePath));
+ dataLifecycleManager.register(absolutePath, lsmBtree);
} else {
- final LocalResource resource = localResourceRepository.getResourceByName(path);
+ final LocalResource resource = localResourceRepository.getResourceByPath(absolutePath);
resourceID = resource.getResourceId();
- lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(resource.getResourceName());
+ lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(absolutePath);
if (lsmBtree == null) {
lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
typeTraits, comparatorFactories, bloomFilterKeyFields,
@@ -424,7 +428,7 @@ public class MetadataBootstrap {
opTracker, runtimeContext.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
null, null, null, null, true);
- dataLifecycleManager.register(path, lsmBtree);
+ dataLifecycleManager.register(absolutePath, lsmBtree);
}
}
@@ -529,4 +533,4 @@ public class MetadataBootstrap {
MetadataManager.INSTANCE.releaseWriteLatch();
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
deleted file mode 100644
index 92ab90d..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.metadata.declared;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.annotations.TypeDataGen;
-import org.apache.asterix.common.config.AsterixMetadataProperties;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.formats.base.IDataFormat;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.api.IMetadataManager;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Datatype;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.data.IAWriterFactory;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-
-public class AqlCompiledMetadataDeclarations {
- private static Logger LOGGER = Logger.getLogger(AqlCompiledMetadataDeclarations.class.getName());
-
- // We are assuming that there is a one AqlCompiledMetadataDeclarations per
- // transaction.
- private final MetadataTransactionContext mdTxnCtx;
- private String dataverseName = null;
- private FileSplit outputFile;
- private Map<String, String[]> stores;
- private IDataFormat format;
- private Map<String, String> config;
-
- private final Map<String, IAType> types;
- private final Map<String, TypeDataGen> typeDataGenMap;
- private final IAWriterFactory writerFactory;
-
- private IMetadataManager metadataManager = MetadataManager.INSTANCE;
- private boolean isConnected = false;
-
- public AqlCompiledMetadataDeclarations(MetadataTransactionContext mdTxnCtx, String dataverseName,
- FileSplit outputFile, Map<String, String> config, Map<String, String[]> stores, Map<String, IAType> types,
- Map<String, TypeDataGen> typeDataGenMap, IAWriterFactory writerFactory, boolean online) {
- this.mdTxnCtx = mdTxnCtx;
- this.dataverseName = dataverseName;
- this.outputFile = outputFile;
- this.config = config;
- AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
- if (stores == null && online) {
- this.stores = metadataProperties.getStores();
- } else {
- this.stores = stores;
- }
- this.types = types;
- this.typeDataGenMap = typeDataGenMap;
- this.writerFactory = writerFactory;
- }
-
- public void connectToDataverse(String dvName) throws AlgebricksException, AsterixException {
- if (isConnected) {
- throw new AlgebricksException("You are already connected to " + dataverseName + " dataverse");
- }
- Dataverse dv;
- try {
- dv = metadataManager.getDataverse(mdTxnCtx, dvName);
- } catch (Exception e) {
- throw new AsterixException(e);
- }
- if (dv == null) {
- throw new AlgebricksException("There is no dataverse with this name " + dvName + " to connect to.");
- }
- dataverseName = dvName;
- isConnected = true;
- try {
- format = (IDataFormat) Class.forName(dv.getDataFormat()).newInstance();
- } catch (Exception e) {
- throw new AsterixException(e);
- }
- }
-
- public void disconnectFromDataverse() throws AlgebricksException {
- if (!isConnected) {
- throw new AlgebricksException("You are not connected to any dataverse");
- }
- dataverseName = null;
- format = null;
- isConnected = false;
- }
-
- public boolean isConnectedToDataverse() {
- return isConnected;
- }
-
- public String getDataverseName() {
- return dataverseName;
- }
-
- public FileSplit getOutputFile() {
- return outputFile;
- }
-
- public IDataFormat getFormat() throws AlgebricksException {
- if (!isConnected) {
- throw new AlgebricksException("You need first to connect to a dataverse.");
- }
- return format;
- }
-
- public String getPropertyValue(String propertyName) {
- return config.get(propertyName);
- }
-
- public IAType findType(String typeName) {
- Datatype type;
- try {
- type = metadataManager.getDatatype(mdTxnCtx, dataverseName, typeName);
- } catch (Exception e) {
- throw new IllegalStateException();
- }
- if (type == null) {
- throw new IllegalStateException();
- }
- return type.getDatatype();
- }
-
- public List<String> findNodeGroupNodeNames(String nodeGroupName) throws AlgebricksException {
- NodeGroup ng;
- try {
- ng = metadataManager.getNodegroup(mdTxnCtx, nodeGroupName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- if (ng == null) {
- throw new AlgebricksException("No node group with this name " + nodeGroupName);
- }
- return ng.getNodeNames();
- }
-
- public Map<String, String[]> getAllStores() {
- return stores;
- }
-
- public Dataset findDataset(String datasetName) throws AlgebricksException {
- try {
- return metadataManager.getDataset(mdTxnCtx, dataverseName, datasetName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
- try {
- return metadataManager.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public Index getDatasetPrimaryIndex(String dataverseName, String datasetName) throws AlgebricksException {
- try {
- return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public Index getIndex(String dataverseName, String datasetName, String indexName) throws AlgebricksException {
- try {
- return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public void setOutputFile(FileSplit outputFile) {
- this.outputFile = outputFile;
- }
-
- public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
- String datasetName, String targetIdxName) throws AlgebricksException {
- FileSplit[] splits = splitsForInternalOrFeedDataset(datasetName, targetIdxName);
- IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
- String[] loc = new String[splits.length];
- for (int p = 0; p < splits.length; p++) {
- loc[p] = splits[p].getNodeName();
- }
- AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
- return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
- }
-
- private FileSplit[] splitsForInternalOrFeedDataset(String datasetName, String targetIdxName)
- throws AlgebricksException {
-
- File relPathFile = new File(getRelativePath(datasetName + "_idx_" + targetIdxName));
- Dataset dataset = findDataset(datasetName);
- if (dataset.getDatasetType() != DatasetType.INTERNAL) {
- throw new AlgebricksException("Not an internal dataset");
- }
- List<String> nodeGroup = findNodeGroupNodeNames(dataset.getNodeGroupName());
- if (nodeGroup == null) {
- throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
- }
-
- List<FileSplit> splitArray = new ArrayList<FileSplit>();
- for (String nd : nodeGroup) {
- String[] nodeStores = stores.get(nd);
- if (nodeStores == null) {
- LOGGER.warning("Node " + nd + " has no stores.");
- throw new AlgebricksException("Node " + nd + " has no stores.");
- } else {
- for (int j = 0; j < nodeStores.length; j++) {
- File f = new File(nodeStores[j] + File.separator + relPathFile);
- splitArray.add(new FileSplit(nd, new FileReference(f)));
- }
- }
- }
- FileSplit[] splits = new FileSplit[splitArray.size()];
- int i = 0;
- for (FileSplit fs : splitArray) {
- splits[i++] = fs;
- }
- return splits;
- }
-
- public String getRelativePath(String fileName) {
- return dataverseName + File.separator + fileName;
- }
-
- public Map<String, TypeDataGen> getTypeDataGenMap() {
- return typeDataGenMap;
- }
-
- public Map<String, IAType> getTypeDeclarations() {
- return types;
- }
-
- public IAWriterFactory getWriterFactory() {
- return writerFactory;
- }
-
- public MetadataTransactionContext getMetadataTransactionContext() {
- return mdTxnCtx;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
index ad06737..e960336 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
@@ -19,10 +19,6 @@
package org.apache.asterix.metadata.declared;
-import java.util.ArrayList;
-import java.util.Map;
-
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
@@ -49,16 +45,6 @@ public class AqlLogicalPlanAndMetadataImpl implements ILogicalPlanAndMetadata {
@Override
public AlgebricksPartitionConstraint getClusterLocations() {
- Map<String, String[]> stores = metadataProvider.getAllStores();
- ArrayList<String> locs = new ArrayList<String>();
- for (String k : stores.keySet()) {
- String[] nodeStores = stores.get(k);
- for (int j = 0; j < nodeStores.length; j++) {
- locs.add(k);
- }
- }
- String[] cluster = new String[locs.size()];
- cluster = locs.toArray(cluster);
- return new AlgebricksAbsolutePartitionConstraint(cluster);
+ return metadataProvider.getClusterLocations();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index d61d323..745f436 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -91,6 +91,7 @@ import org.apache.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
import org.apache.asterix.metadata.feeds.FeedUtil;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -148,7 +149,6 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.primitive.ShortPointable;
@@ -160,16 +160,13 @@ import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import org.apache.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
@@ -2095,10 +2092,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
return jobId;
}
- public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
- return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
- }
-
public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
throws AlgebricksException {
return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
@@ -2128,7 +2121,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
.getNodeNames();
for (String nd : nodeGroup) {
- numPartitions += AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
+ numPartitions += AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
}
return numElementsHint /= numPartitions;
}
@@ -2141,88 +2134,17 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
- return splitProviderAndPartitionConstraints(splits);
+ return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraints(splits);
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
String dataverse) {
- FileSplit[] splits = splitsForDataverse(mdTxnCtx, dataverse);
- return splitProviderAndPartitionConstraints(splits);
- }
-
- private Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
- FileSplit[] splits) {
- IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
- String[] loc = new String[splits.length];
- for (int p = 0; p < splits.length; p++) {
- loc[p] = splits[p].getNodeName();
- }
- AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
- return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
- }
-
- private FileSplit[] splitsForDataverse(MetadataTransactionContext mdTxnCtx, String dataverseName) {
- File relPathFile = new File(dataverseName);
- List<FileSplit> splits = new ArrayList<FileSplit>();
- for (Map.Entry<String, String[]> entry : stores.entrySet()) {
- String node = entry.getKey();
- String[] nodeStores = entry.getValue();
- if (nodeStores == null) {
- continue;
- }
- for (int i = 0; i < nodeStores.length; i++) {
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
- String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(node);
- for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- File f = new File(ioDevices[k] + File.separator + nodeStores[j] + File.separator + relPathFile);
- splits.add(new FileSplit(node, new FileReference(f), k));
- }
- }
- }
- }
- return splits.toArray(new FileSplit[] {});
+ return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse);
}
public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
String targetIdxName, boolean temp) throws AlgebricksException {
- try {
- File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
- Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
- .getNodeNames();
- if (nodeGroup == null) {
- throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
- }
-
- List<FileSplit> splitArray = new ArrayList<FileSplit>();
- for (String nd : nodeGroup) {
- String[] nodeStores = stores.get(nd);
- if (nodeStores == null) {
- LOGGER.warning("Node " + nd + " has no stores.");
- throw new AlgebricksException("Node " + nd + " has no stores.");
- } else {
- int numIODevices;
- if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
- numIODevices = 1;
- } else {
- numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
- }
- String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
- for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- File f = new File(ioDevices[k] + File.separator + nodeStores[j]
- + (temp ? (File.separator + TEMP_DATASETS_STORAGE_FOLDER) : "") + File.separator
- + relPathFile);
- splitArray.add(new FileSplit(nd, new FileReference(f), k));
- }
- }
- }
- }
- return splitArray.toArray(new FileSplit[0]);
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
+ return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
}
private static Map<String, String> initializeAdapterFactoryMapping() {
@@ -2256,10 +2178,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
return adapter;
}
- private static String getRelativePath(String dataverseName, String fileName) {
- return dataverseName + File.separator + fileName;
- }
-
public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
try {
return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, dataset);
@@ -2307,19 +2225,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
public AlgebricksPartitionConstraint getClusterLocations() {
- ArrayList<String> locs = new ArrayList<String>();
- for (String i : stores.keySet()) {
- String[] nodeStores = stores.get(i);
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
- for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- locs.add(i);
- }
- }
- }
- String[] cluster = new String[locs.size()];
- cluster = locs.toArray(cluster);
- return new AlgebricksAbsolutePartitionConstraint(cluster);
+ return AsterixClusterProperties.INSTANCE.getClusterLocations();
}
public IDataFormat getFormat() {
@@ -2334,7 +2240,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
* @return a new map containing the original dataset properties and the
* scheduler/locations
*/
- private Map<String, Object> wrapProperties(Map<String, String> properties) {
+ private static Map<String, Object> wrapProperties(Map<String, String> properties) {
Map<String, Object> wrappedProperties = new HashMap<String, Object>();
wrappedProperties.putAll(properties);
// wrappedProperties.put(SCHEDULER, hdfsScheduler);
@@ -2349,7 +2255,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
* the original properties
* @return the new stirng-object map
*/
- private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+ private static Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
Map<String, Object> wrappedProperties = new HashMap<String, Object>();
wrappedProperties.putAll(properties);
return wrappedProperties;
@@ -2357,58 +2263,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
- FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
- return splitProviderAndPartitionConstraints(splits);
- }
-
- private FileSplit[] splitsForFilesIndex(MetadataTransactionContext mdTxnCtx, String dataverseName,
- String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
-
- try {
- File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
- Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
- .getNodeNames();
- if (nodeGroup == null) {
- throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
- }
-
- List<FileSplit> splitArray = new ArrayList<FileSplit>();
- for (String nd : nodeGroup) {
- String[] nodeStores = stores.get(nd);
- if (nodeStores == null) {
- LOGGER.warning("Node " + nd + " has no stores.");
- throw new AlgebricksException("Node " + nd + " has no stores.");
- } else {
- // Only the first partition when create
- String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
- if (create) {
- for (int j = 0; j < nodeStores.length; j++) {
- File f = new File(
- ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
- splitArray.add(new FileSplit(nd, new FileReference(f), 0));
- }
- } else {
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
- for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- File f = new File(
- ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
- splitArray.add(new FileSplit(nd, new FileReference(f), 0));
- }
- }
- }
- }
- }
- FileSplit[] splits = new FileSplit[splitArray.size()];
- int i = 0;
- for (FileSplit fs : splitArray) {
- splits[i++] = fs;
- }
- return splits;
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
+ return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName,
+ datasetName, targetIdxName, create);
}
public AsterixStorageProperties getStorageProperties() {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
new file mode 100644
index 0000000..5ef58cd
--- /dev/null
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataConstants;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class SplitsAndConstraintsUtil {
+
+ public static final String PARTITION_DIR_PREFIX = "partition_";
+ public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
+ public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
+
+ private static FileSplit[] splitsForDataverse(String dataverseName) {
+ File relPathFile = new File(dataverseName);
+ List<FileSplit> splits = new ArrayList<FileSplit>();
+ //get all partitions
+ ClusterPartition[] clusterPartition = AsterixClusterProperties.INSTANCE.getClusterPartitons();
+ String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+ for (int j = 0; j < clusterPartition.length; j++) {
+ int nodeParitions = AsterixClusterProperties.INSTANCE
+ .getNodePartitionsCount(clusterPartition[j].getNodeId());
+ for (int i = 0; i < nodeParitions; i++) {
+ File f = new File(prepareStoragePartitionPath(storageDirName, clusterPartition[i].getPartitionId())
+ + File.separator + relPathFile);
+ splits.add(getFileSplitForClusterPartition(clusterPartition[j], f));
+ }
+ }
+ return splits.toArray(new FileSplit[] {});
+ }
+
+ public static FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
+ String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
+ try {
+ File relPathFile = new File(prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
+ Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+ .getNodeNames();
+ if (nodeGroup == null) {
+ throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
+ }
+
+ String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+ List<FileSplit> splits = new ArrayList<FileSplit>();
+ for (String nd : nodeGroup) {
+ int numPartitions = AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
+ ClusterPartition[] nodePartitions = AsterixClusterProperties.INSTANCE.getNodePartitions(nd);
+ //currently this case is never executed since the metadata group doesn't exists
+ if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
+ numPartitions = 1;
+ }
+
+ for (int k = 0; k < numPartitions; k++) {
+ //format: 'storage dir name'/partition_#/dataverse/dataset_idx_index
+ File f = new File(prepareStoragePartitionPath(storageDirName, nodePartitions[k].getPartitionId())
+ + (temp ? (File.separator + TEMP_DATASETS_STORAGE_FOLDER) : "") + File.separator
+ + relPathFile);
+ splits.add(getFileSplitForClusterPartition(nodePartitions[k], f));
+ }
+ }
+ return splits.toArray(new FileSplit[] {});
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
+ }
+ }
+
+ private static FileSplit[] splitsForFilesIndex(MetadataTransactionContext mdTxnCtx, String dataverseName,
+ String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
+ try {
+ File relPathFile = new File(prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
+ Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+ .getNodeNames();
+ if (nodeGroup == null) {
+ throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
+ }
+
+ List<FileSplit> splits = new ArrayList<FileSplit>();
+ for (String nodeId : nodeGroup) {
+ //get node partitions
+ ClusterPartition[] nodePartitions = AsterixClusterProperties.INSTANCE.getNodePartitions(nodeId);
+ String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+ int firstPartition = 0;
+ if (create) {
+ // Only the first partition when create
+ File f = new File(
+ prepareStoragePartitionPath(storageDirName, nodePartitions[firstPartition].getPartitionId())
+ + File.separator + relPathFile);
+ splits.add(getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+ } else {
+ for (int k = 0; k < nodePartitions.length; k++) {
+ File f = new File(prepareStoragePartitionPath(storageDirName,
+ nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile);
+ splits.add(getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+ }
+ }
+ }
+ return splits.toArray(new FileSplit[] {});
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
+ }
+ }
+
+ public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
+ String dataverse) {
+ FileSplit[] splits = splitsForDataverse(dataverse);
+ return splitProviderAndPartitionConstraints(splits);
+ }
+
+ public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
+ MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String targetIdxName,
+ boolean create) throws AlgebricksException {
+ FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
+ return splitProviderAndPartitionConstraints(splits);
+ }
+
+ public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
+ FileSplit[] splits) {
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+ String[] loc = new String[splits.length];
+ for (int p = 0; p < splits.length; p++) {
+ loc[p] = splits[p].getNodeName();
+ }
+ AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
+ return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
+ }
+
+ private static FileSplit getFileSplitForClusterPartition(ClusterPartition partition, File relativeFile) {
+ return new FileSplit(partition.getActiveNodeId(), new FileReference(relativeFile), partition.getIODeviceNum(),
+ partition.getPartitionId());
+ }
+
+ public static String prepareStoragePartitionPath(String storageDirName, int partitonId) {
+ return storageDirName + File.separator + PARTITION_DIR_PREFIX + partitonId;
+ }
+
+ private static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
+ return dataverseName + File.separator + datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index f2482da..95eea63 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -33,6 +34,7 @@ import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -50,7 +52,7 @@ public class AsterixClusterProperties {
public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
private static final String IO_DEVICES = "iodevices";
-
+ private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
private Map<String, Map<String, String>> ncConfiguration = new HashMap<String, Map<String, String>>();
private final Cluster cluster;
@@ -59,6 +61,9 @@ public class AsterixClusterProperties {
private boolean globalRecoveryCompleted = false;
+ private Map<String, ClusterPartition[]> node2PartitionsMap = null;
+ private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
+
private AsterixClusterProperties() {
InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
if (is != null) {
@@ -66,37 +71,73 @@ public class AsterixClusterProperties {
JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
Unmarshaller unmarshaller = ctx.createUnmarshaller();
cluster = (Cluster) unmarshaller.unmarshal(is);
-
} catch (JAXBException e) {
throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
}
} else {
cluster = null;
}
+ //if this is the CC process
+ if (AsterixAppContextInfo.getInstance() != null) {
+ if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) {
+ node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
+ clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
+ }
+ }
}
private ClusterState state = ClusterState.UNUSABLE;
public synchronized void removeNCConfiguration(String nodeId) {
+ updateNodePartitions(nodeId, false);
ncConfiguration.remove(nodeId);
- if (ncConfiguration.keySet().size() != AsterixAppContextInfo.getInstance().getMetadataProperties()
- .getNodeNames().size()) {
- state = ClusterState.UNUSABLE;
- LOGGER.info("Cluster now is in UNSABLE state");
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(" Removing configuration parameters for node id " + nodeId);
}
- resetClusterPartitionConstraint();
+ //TODO implement fault tolerance as follows:
+ //1. collect the partitions of the failed NC
+ //2. For each partition, request a remote replica to take over.
+ //3. wait until each remote replica completes the recovery for the lost partitions
+ //4. update the cluster state
}
public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
ncConfiguration.put(nodeId, configuration);
- if (ncConfiguration.keySet().size() == AsterixAppContextInfo.getInstance().getMetadataProperties()
- .getNodeNames().size()) {
- state = ClusterState.ACTIVE;
- }
+ updateNodePartitions(nodeId, true);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" Registering configuration parameters for node id " + nodeId);
}
- resetClusterPartitionConstraint();
+ }
+
+ private synchronized void updateNodePartitions(String nodeId, boolean added) {
+ ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
+ //if this isn't a storage node, it will not have cluster partitions
+ if (nodePartitions != null) {
+ for (ClusterPartition p : nodePartitions) {
+ //set the active node for this node's partitions
+ p.setActive(added);
+ if (added) {
+ p.setActiveNodeId(nodeId);
+ } else {
+ p.setActiveNodeId(null);
+ }
+ }
+ resetClusterPartitionConstraint();
+ updateClusterState();
+ }
+ }
+
+ private synchronized void updateClusterState() {
+ for (ClusterPartition p : clusterPartitions.values()) {
+ if (!p.isActive()) {
+ state = ClusterState.UNUSABLE;
+ LOGGER.info("Cluster is in UNSABLE state");
+ return;
+ }
+ }
+ //if all storage partitions are active, then the cluster is active
+ state = ClusterState.ACTIVE;
+ LOGGER.info("Cluster is now ACTIVE");
}
/**
@@ -162,20 +203,14 @@ public class AsterixClusterProperties {
}
private synchronized void resetClusterPartitionConstraint() {
- Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
- ArrayList<String> locs = new ArrayList<String>();
- for (String i : stores.keySet()) {
- String[] nodeStores = stores.get(i);
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
- for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- locs.add(i);
- }
+ ArrayList<String> clusterActiveLocations = new ArrayList<>();
+ for (ClusterPartition p : clusterPartitions.values()) {
+ if (p.isActive()) {
+ clusterActiveLocations.add(p.getActiveNodeId());
}
}
- String[] cluster = new String[locs.size()];
- cluster = locs.toArray(cluster);
- clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(cluster);
+ clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+ clusterActiveLocations.toArray(new String[] {}));
}
public boolean isGlobalRecoveryCompleted() {
@@ -194,7 +229,34 @@ public class AsterixClusterProperties {
return AsterixClusterProperties.INSTANCE.getState() == ClusterState.ACTIVE;
}
- public static int getNumberOfNodes(){
+ public static int getNumberOfNodes() {
return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size();
}
+
+ public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
+ return node2PartitionsMap.get(nodeId);
+ }
+
+ public synchronized int getNodePartitionsCount(String node) {
+ if (node2PartitionsMap.containsKey(node)) {
+ return node2PartitionsMap.get(node).length;
+ }
+ return 0;
+ }
+
+ public synchronized ClusterPartition[] getClusterPartitons() {
+ ArrayList<ClusterPartition> partitons = new ArrayList<>();
+ for (ClusterPartition cluster : clusterPartitions.values()) {
+ partitons.add(cluster);
+ }
+ return partitons.toArray(new ClusterPartition[] {});
+ }
+
+ public String getStorageDirectoryName() {
+ if (cluster != null) {
+ return cluster.getStore();
+ }
+ //virtual cluster without cluster config file
+ return DEFAULT_STORAGE_DIR_NAME;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 58bc44e..794f867 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -50,13 +50,13 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 48ebff3..06a1957 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -47,12 +47,12 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 23eb2be..f2a6820 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -48,12 +48,12 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 6e0394a..8d838a3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -48,12 +48,12 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
index 0566367..15224e2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
@@ -33,7 +33,7 @@ public class PersistentLocalResourceFactory implements ILocalResourceFactory {
}
@Override
- public LocalResource createLocalResource(long resourceId, String resourceName, int partition) {
- return new LocalResource(resourceId, resourceName, partition, resourceType, localResourceMetadata);
+ public LocalResource createLocalResource(long resourceId, String resourceName, int partition, String resourcePath) {
+ return new LocalResource(resourceId, resourceName, partition, resourcePath, resourceType, localResourceMetadata);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 8ae3eb1..52fd806 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -64,8 +64,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
private boolean isReplicationEnabled = false;
private Set<String> filesToBeReplicated;
- public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId)
- throws HyracksDataException {
+ public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
mountPoints = new String[devices.size()];
this.nodeId = nodeId;
for (int i = 0; i < mountPoints.length; i++) {
@@ -123,7 +122,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
}
LocalResource rootLocalResource = new LocalResource(STORAGE_LOCAL_RESOURCE_ID,
- storageMetadataFile.getAbsolutePath(), 0, 0, storageRootDirPath);
+ storageMetadataFile.getAbsolutePath(), 0, storageMetadataFile.getAbsolutePath(), 0,
+ storageRootDirPath);
insert(rootLocalResource);
LOGGER.log(Level.INFO, "created the root-metadata-file: " + storageMetadataFile.getAbsolutePath());
}
@@ -131,13 +131,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
}
@Override
- public LocalResource getResourceByName(String name) throws HyracksDataException {
- LocalResource resource = resourceCache.getIfPresent(name);
+ public LocalResource getResourceByPath(String path) throws HyracksDataException {
+ LocalResource resource = resourceCache.getIfPresent(path);
if (resource == null) {
- File resourceFile = getLocalResourceFileByName(name);
+ File resourceFile = getLocalResourceFileByName(path);
if (resourceFile.exists()) {
resource = readLocalResource(resourceFile);
- resourceCache.put(name, resource);
+ resourceCache.put(path, resource);
}
}
return resource;
@@ -145,13 +145,15 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
@Override
public synchronized void insert(LocalResource resource) throws HyracksDataException {
- File resourceFile = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
+ File resourceFile = new File(getFileName(resource.getResourcePath(), resource.getResourceId()));
if (resourceFile.exists()) {
throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
+ } else {
+ resourceFile.getParentFile().mkdirs();
}
if (resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
- resourceCache.put(resource.getResourceName(), resource);
+ resourceCache.put(resource.getResourcePath(), resource);
}
FileOutputStream fos = null;
@@ -182,18 +184,18 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
//if replication enabled, send resource metadata info to remote nodes
if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
- String filePath = getFileName(resource.getResourceName(), resource.getResourceId());
+ String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
createReplicationJob(ReplicationOperation.REPLICATE, filePath);
}
}
}
@Override
- public synchronized void deleteResourceByName(String name) throws HyracksDataException {
- File resourceFile = getLocalResourceFileByName(name);
+ public synchronized void deleteResourceByPath(String resourcePath) throws HyracksDataException {
+ File resourceFile = getLocalResourceFileByName(resourcePath);
if (resourceFile.exists()) {
resourceFile.delete();
- resourceCache.invalidate(name);
+ resourceCache.invalidate(resourcePath);
//if replication enabled, delete resource from remote replicas
if (isReplicationEnabled && !resourceFile.getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
@@ -204,8 +206,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
}
}
- private static File getLocalResourceFileByName(String resourceName) {
- return new File(resourceName + File.separator + METADATA_FILE_NAME);
+ private static File getLocalResourceFileByName(String resourcePath) {
+ return new File(resourcePath + File.separator + METADATA_FILE_NAME);
}
public HashMap<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
@@ -220,25 +222,21 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
}
//load all local resources.
- File[] dataverseFileList = storageRootDir.listFiles();
- if (dataverseFileList != null) {
- for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- File[] ioDevicesList = indexFile.listFiles();
- if (ioDevicesList != null) {
- for (File ioDeviceFile : ioDevicesList) {
- if (ioDeviceFile.isDirectory()) {
- File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
- if (metadataFiles != null) {
- for (File metadataFile : metadataFiles) {
- LocalResource localResource = readLocalResource(metadataFile);
- resourcesMap.put(localResource.getResourceId(), localResource);
- }
- }
+ File[] partitions = storageRootDir.listFiles();
+ for (File partition : partitions) {
+ File[] dataverseFileList = partition.listFiles();
+ if (dataverseFileList != null) {
+ for (File dataverseFile : dataverseFileList) {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ if (indexFile.isDirectory()) {
+ File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+ if (metadataFiles != null) {
+ for (File metadataFile : metadataFiles) {
+ LocalResource localResource = readLocalResource(metadataFile);
+ resourcesMap.put(localResource.getResourceId(), localResource);
}
}
}
@@ -263,27 +261,23 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
continue;
}
- //traverse all local resources.
- File[] dataverseFileList = storageRootDir.listFiles();
- if (dataverseFileList != null) {
- for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- File[] ioDevicesList = indexFile.listFiles();
- if (ioDevicesList != null) {
- for (File ioDeviceFile : ioDevicesList) {
- if (ioDeviceFile.isDirectory()) {
- File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
- if (metadataFiles != null) {
- for (File metadataFile : metadataFiles) {
- LocalResource localResource = readLocalResource(metadataFile);
- maxResourceId = Math.max(maxResourceId,
- localResource.getResourceId());
- }
- }
+ //load all local resources.
+ File[] partitions = storageRootDir.listFiles();
+ for (File partition : partitions) {
+ //traverse all local resources.
+ File[] dataverseFileList = partition.listFiles();
+ if (dataverseFileList != null) {
+ for (File dataverseFile : dataverseFileList) {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ if (indexFile.isDirectory()) {
+ File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+ if (metadataFiles != null) {
+ for (File metadataFile : metadataFiles) {
+ LocalResource localResource = readLocalResource(metadataFile);
+ maxResourceId = Math.max(maxResourceId, localResource.getResourceId());
}
}
}
@@ -305,8 +299,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
if (!baseDir.endsWith(System.getProperty("file.separator"))) {
baseDir += System.getProperty("file.separator");
}
- String fileName = new String(baseDir + METADATA_FILE_NAME);
- return fileName;
+ return new String(baseDir + METADATA_FILE_NAME);
}
}
@@ -376,6 +369,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
/**
* Deletes physical files of all data verses.
+ *
* @param deleteStorageMetadata
* @throws IOException
*/