You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 16:59:42 UTC
[07/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master'
into hyracks-merge2
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
index 581d01c,0000000..0ac4f56
mode 100644,000000..100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
@@@ -1,277 -1,0 +1,282 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.metadata.utils;
+
+import java.io.DataOutput;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.MetadataConstants;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
++import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.CompactionPolicy;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class DatasetUtils {
+ public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, ARecordType itemType,
+ IBinaryComparatorFactoryProvider comparatorFactoryProvider) throws AlgebricksException {
+ List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+ IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()];
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ // Get comparators for RID fields.
+ for (int i = 0; i < partitioningKeys.size(); i++) {
+ try {
+ bcfs[i] = IndexingConstants.getComparatorFactory(i);
+ } catch (AsterixException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ } else {
+ for (int i = 0; i < partitioningKeys.size(); i++) {
+ IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+ bcfs[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
+ }
+ }
+ return bcfs;
+ }
+
+ public static int[] createBloomFilterKeyFields(Dataset dataset) throws AlgebricksException {
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AlgebricksException("not implemented");
+ }
+ List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+ int[] bloomFilterKeyFields = new int[partitioningKeys.size()];
+ for (int i = 0; i < partitioningKeys.size(); ++i) {
+ bloomFilterKeyFields[i] = i;
+ }
+ return bloomFilterKeyFields;
+ }
+
+ public static IBinaryHashFunctionFactory[] computeKeysBinaryHashFunFactories(Dataset dataset, ARecordType itemType,
+ IBinaryHashFunctionFactoryProvider hashFunProvider) throws AlgebricksException {
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AlgebricksException("not implemented");
+ }
+ List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+ IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[partitioningKeys.size()];
+ for (int i = 0; i < partitioningKeys.size(); i++) {
+ IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+ bhffs[i] = hashFunProvider.getBinaryHashFunctionFactory(keyType);
+ }
+ return bhffs;
+ }
+
- public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, ARecordType itemType)
- throws AlgebricksException {
- return computeTupleTypeTraits(dataset, itemType, null);
- }
-
+ public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, ARecordType itemType, ARecordType metaItemType)
+ throws AlgebricksException {
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AlgebricksException("not implemented");
+ }
+ List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+ int numKeys = partitioningKeys.size();
+ ITypeTraits[] typeTraits;
+ if (metaItemType != null) {
+ typeTraits = new ITypeTraits[numKeys + 2];
+ List<Integer> indicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+ typeTraits[numKeys + 1] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(metaItemType);
+ for (int i = 0; i < numKeys; i++) {
+ IAType keyType;
+ if (indicator.get(i) == 0) {
+ keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+ } else {
+ keyType = metaItemType.getSubFieldType(partitioningKeys.get(i));
+ }
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+ } else {
+ typeTraits = new ITypeTraits[numKeys + 1];
+ for (int i = 0; i < numKeys; i++) {
+ IAType keyType;
+ keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+ }
+ typeTraits[numKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+ return typeTraits;
+ }
+
+ public static List<List<String>> getPartitioningKeys(Dataset dataset) {
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ return IndexingConstants.getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
+ }
+ return ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
+ }
+
+ public static List<String> getFilterField(Dataset dataset) {
+ return (((InternalDatasetDetails) dataset.getDatasetDetails())).getFilterField();
+ }
+
+ public static IBinaryComparatorFactory[] computeFilterBinaryComparatorFactories(Dataset dataset,
+ ARecordType itemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
- throws AlgebricksException {
++ throws AlgebricksException {
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ return null;
+ }
+ List<String> filterField = getFilterField(dataset);
+ if (filterField == null) {
+ return null;
+ }
+ IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[1];
+ IAType type = itemType.getSubFieldType(filterField);
+ bcfs[0] = comparatorFactoryProvider.getBinaryComparatorFactory(type, true);
+ return bcfs;
+ }
+
+ public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, ARecordType itemType)
+ throws AlgebricksException {
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ return null;
+ }
+ List<String> filterField = getFilterField(dataset);
+ if (filterField == null) {
+ return null;
+ }
+ ITypeTraits[] typeTraits = new ITypeTraits[1];
+ IAType type = itemType.getSubFieldType(filterField);
+ typeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(type);
+ return typeTraits;
+ }
+
+ public static int[] createFilterFields(Dataset dataset) throws AlgebricksException {
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ return null;
+ }
+
+ List<String> filterField = getFilterField(dataset);
+ if (filterField == null) {
+ return null;
+ }
+ List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+ int numKeys = partitioningKeys.size();
+
+ int[] filterFields = new int[1];
+ filterFields[0] = numKeys + 1;
+ return filterFields;
+ }
+
+ public static int[] createBTreeFieldsWhenThereisAFilter(Dataset dataset) throws AlgebricksException {
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ return null;
+ }
+
+ List<String> filterField = getFilterField(dataset);
+ if (filterField == null) {
+ return null;
+ }
+
+ List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+ int valueFields = dataset.hasMetaPart() ? 2 : 1;
+ int[] btreeFields = new int[partitioningKeys.size() + valueFields];
+ for (int i = 0; i < btreeFields.length; ++i) {
+ btreeFields[i] = i;
+ }
+ return btreeFields;
+ }
+
+ public static int getPositionOfPartitioningKeyField(Dataset dataset, String fieldExpr) {
+ List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+ for (int i = 0; i < partitioningKeys.size(); i++) {
+ if ((partitioningKeys.get(i).size() == 1) && partitioningKeys.get(i).get(0).equals(fieldExpr)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ public static Pair<ILSMMergePolicyFactory, Map<String, String>> getMergePolicyFactory(Dataset dataset,
+ MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException {
+ String policyName = dataset.getCompactionPolicy();
+ CompactionPolicy compactionPolicy = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
+ MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
+ String compactionPolicyFactoryClassName = compactionPolicy.getClassName();
+ ILSMMergePolicyFactory mergePolicyFactory;
+ try {
+ mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance();
+ if (mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
+ ((CorrelatedPrefixMergePolicyFactory) mergePolicyFactory).setDatasetID(dataset.getDatasetId());
+ }
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new AlgebricksException(e);
+ }
+ Map<String, String> properties = dataset.getCompactionPolicyProperties();
+ return new Pair<ILSMMergePolicyFactory, Map<String, String>>(mergePolicyFactory, properties);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void writePropertyTypeRecord(String name, String value, DataOutput out, ARecordType recordType)
+ throws HyracksDataException {
+ IARecordBuilder propertyRecordBuilder = new RecordBuilder();
+ ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+ propertyRecordBuilder.reset(recordType);
+ AMutableString aString = new AMutableString("");
+ ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(name);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ propertyRecordBuilder.addField(0, fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(value);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ propertyRecordBuilder.addField(1, fieldValue);
+
+ propertyRecordBuilder.write(out, true);
+ }
++
++ public static ARecordType getMetaType(AqlMetadataProvider metadataProvider, Dataset dataset)
++ throws AlgebricksException {
++ if (dataset.hasMetaPart()) {
++ return (ARecordType) metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
++ dataset.getMetaItemTypeName());
++ }
++ return null;
++ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
index 51c3802,0000000..0e9aa0c
mode 100644,000000..100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
@@@ -1,59 -1,0 +1,63 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.util;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
++import org.apache.hyracks.control.cc.ClusterControllerService;
++
+/**
+ * Utility class for obtaining information on the set of Hyracks NodeController
+ * processes that are running on a given host.
+ */
+public class AsterixRuntimeUtil {
+
+ public static Set<String> getNodeControllersOnIP(InetAddress ipAddress) throws Exception {
+ Map<InetAddress, Set<String>> nodeControllerInfo = getNodeControllerMap();
+ Set<String> nodeControllersAtLocation = nodeControllerInfo.get(ipAddress);
+ return nodeControllersAtLocation;
+ }
+
+ public static List<String> getAllNodeControllers() throws Exception {
+ Collection<Set<String>> nodeControllersCollection = getNodeControllerMap().values();
+ List<String> nodeControllers = new ArrayList<String>();
+ for (Set<String> ncCollection : nodeControllersCollection) {
+ nodeControllers.addAll(ncCollection);
+ }
+ return nodeControllers;
+ }
+
+ public static Map<InetAddress, Set<String>> getNodeControllerMap() throws Exception {
+ Map<InetAddress, Set<String>> map = new HashMap<InetAddress, Set<String>>();
+ AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
+ return map;
+ }
+
+ public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) throws Exception {
- AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
++ ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.getInstance()
++ .getCCApplicationContext().getControllerService();
++ map.putAll(ccs.getIpAddressNodeNameMap());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index 2bf5fa3,0000000..a349e51
mode 100644,000000..100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@@ -1,154 -1,0 +1,154 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.storage;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
+
+public class LSMIndexFileProperties {
+
+ private String fileName;
+ private long fileSize;
+ private String nodeId;
+ private String dataverse;
+ private String idxName;
+ private boolean lsmComponentFile;
+ private String filePath;
+ private boolean requiresAck = false;
+ private long LSNByteOffset;
+ private int partition;
+
+ public LSMIndexFileProperties() {
+ }
+
+ public LSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
+ long LSNByteOffset, boolean requiresAck) {
+ initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
+ }
+
+ public LSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
+ initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false,
+ IMetaDataPageManager.INVALID_LSN_OFFSET, false);
+ }
+
+ public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset,
+ boolean requiresAck) {
+ this.filePath = filePath;
+ this.fileSize = fileSize;
+ this.nodeId = nodeId;
+ this.lsmComponentFile = lsmComponentFile;
+ this.LSNByteOffset = LSNByteOffset;
+ this.requiresAck = requiresAck;
+ }
+
+ public void splitFileName() {
+ String[] tokens = filePath.split(File.separator);
+ int arraySize = tokens.length;
+ this.fileName = tokens[arraySize - 1];
+ this.idxName = tokens[arraySize - 2];
+ this.dataverse = tokens[arraySize - 3];
- this.partition = StoragePathUtil.getPartitonNumFromName(tokens[arraySize - 4]);
++ this.partition = StoragePathUtil.getPartitionNumFromName(tokens[arraySize - 4]);
+ }
+
+ public void serialize(OutputStream out) throws IOException {
+ DataOutputStream dos = new DataOutputStream(out);
+ dos.writeUTF(nodeId);
+ dos.writeUTF(filePath);
+ dos.writeLong(fileSize);
+ dos.writeBoolean(lsmComponentFile);
+ dos.writeLong(LSNByteOffset);
+ dos.writeBoolean(requiresAck);
+ }
+
+ public static LSMIndexFileProperties create(DataInput input) throws IOException {
+ String nodeId = input.readUTF();
+ String filePath = input.readUTF();
+ long fileSize = input.readLong();
+ boolean lsmComponentFile = input.readBoolean();
+ long LSNByteOffset = input.readLong();
+ boolean requiresAck = input.readBoolean();
+ LSMIndexFileProperties fileProp = new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile,
+ LSNByteOffset, requiresAck);
+ return fileProp;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getDataverse() {
+ return dataverse;
+ }
+
+ public void setDataverse(String dataverse) {
+ this.dataverse = dataverse;
+ }
+
+ public String getIdxName() {
+ return idxName;
+ }
+
+ public boolean isLSMComponentFile() {
+ return lsmComponentFile;
+ }
+
+ public boolean requiresAck() {
+ return requiresAck;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("File Name: " + fileName + " ");
+ sb.append("File Size: " + fileSize + " ");
+ sb.append("Node ID: " + nodeId + " ");
+ sb.append("Partition: " + partition + " ");
+ sb.append("IDX Name: " + idxName + " ");
+ sb.append("isLSMComponentFile : " + lsmComponentFile + " ");
+ sb.append("Dataverse: " + dataverse);
+ sb.append("LSN Byte Offset: " + LSNByteOffset);
+ return sb.toString();
+ }
+
+ public long getLSNByteOffset() {
+ return LSNByteOffset;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-runtime/pom.xml
----------------------------------------------------------------------
diff --cc asterixdb/asterix-runtime/pom.xml
index 6414139,0000000..1914336
mode 100644,000000..100644
--- a/asterixdb/asterix-runtime/pom.xml
+++ b/asterixdb/asterix-runtime/pom.xml
@@@ -1,82 -1,0 +1,72 @@@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>apache-asterixdb</artifactId>
+ <groupId>org.apache.asterix</groupId>
+ <version>0.8.9-SNAPSHOT</version>
+ </parent>
+ <artifactId>asterix-runtime</artifactId>
+ <properties>
+ <appendedResourcesDirectory>${basedir}/../src/main/appended-resources</appendedResourcesDirectory>
+ </properties>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-om</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-fuzzyjoin</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-btree</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-transactions</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
- <groupId>org.twitter4j</groupId>
- <artifactId>twitter4j-core</artifactId>
- <version>[4.0,)</version>
- </dependency>
- <dependency>
- <groupId>org.twitter4j</groupId>
- <artifactId>twitter4j-stream</artifactId>
- <version>[4.0,)</version>
- </dependency>
- <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 3a1e729,0000000..561b144
mode 100644,000000..100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@@ -1,467 -1,0 +1,478 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.resource;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.replication.AsterixReplicationJob;
+import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
+import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
+import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+public class PersistentLocalResourceRepository implements ILocalResourceRepository {
+
+ private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName());
+ private final String[] mountPoints;
+ private static final String STORAGE_METADATA_DIRECTORY = "asterix_root_metadata";
+ private static final String STORAGE_METADATA_FILE_NAME_PREFIX = ".asterix_root_metadata";
+ private static final long STORAGE_LOCAL_RESOURCE_ID = -4321;
+ public static final String METADATA_FILE_NAME = ".metadata";
+ private final Cache<String, LocalResource> resourceCache;
+ private final String nodeId;
+ private static final int MAX_CACHED_RESOURCES = 1000;
+ private IReplicationManager replicationManager;
+ private boolean isReplicationEnabled = false;
+ private Set<String> filesToBeReplicated;
+ private final SortedMap<Integer, ClusterPartition> clusterPartitions;
+ private final Set<Integer> nodeOriginalPartitions;
+ private final Set<Integer> nodeActivePartitions;
+ private Set<Integer> nodeInactivePartitions;
+
+ public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId,
+ AsterixMetadataProperties metadataProperties) throws HyracksDataException {
+ mountPoints = new String[devices.size()];
+ this.nodeId = nodeId;
+ this.clusterPartitions = metadataProperties.getClusterPartitions();
+ for (int i = 0; i < mountPoints.length; i++) {
+ String mountPoint = devices.get(i).getPath().getPath();
+ File mountPointDir = new File(mountPoint);
+ if (!mountPointDir.exists()) {
+ throw new HyracksDataException(mountPointDir.getAbsolutePath() + " doesn't exist.");
+ }
+ if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
+ mountPoints[i] = new String(mountPoint + System.getProperty("file.separator"));
+ } else {
+ mountPoints[i] = new String(mountPoint);
+ }
+ }
+ resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
+
+ ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+ //initially the node active partitions are the same as the original partitions
+ nodeOriginalPartitions = new HashSet<>(nodePartitions.length);
+ nodeActivePartitions = new HashSet<>(nodePartitions.length);
+ for (ClusterPartition partition : nodePartitions) {
+ nodeOriginalPartitions.add(partition.getPartitionId());
+ nodeActivePartitions.add(partition.getPartitionId());
+ }
+ }
+
+ private static String getStorageMetadataDirPath(String mountPoint, String nodeId, int ioDeviceId) {
+ return mountPoint + STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" + ioDeviceId;
+ }
+
+ private static File getStorageMetadataBaseDir(File storageMetadataFile) {
+ //STORAGE_METADATA_DIRECTORY / Node Id / STORAGE_METADATA_FILE_NAME_PREFIX
+ return storageMetadataFile.getParentFile().getParentFile();
+ }
+
+ public void initializeNewUniverse(String storageRootDirName) throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Initializing local resource repository ... ");
+ }
+
+ //create storage metadata file (This file is used to locate the root storage directory after instance restarts).
+ //TODO with the existing cluster configuration file being static and distributed on all NCs, we can find out the storage root
+ //directory without looking at this file. This file could potentially store more information, otherwise no need to keep it.
+ for (int i = 0; i < mountPoints.length; i++) {
+ File storageMetadataFile = getStorageMetadataFile(mountPoints[i], nodeId, i);
+ File storageMetadataDir = storageMetadataFile.getParentFile();
+ //make dirs for the storage metadata file
+ boolean success = storageMetadataDir.mkdirs();
+ if (!success) {
+ throw new IllegalStateException(
+ "Unable to create storage metadata directory of PersistentLocalResourceRepository in "
+ + storageMetadataDir.getAbsolutePath() + " or directory already exists");
+ }
+
+ LOGGER.log(Level.INFO,
+ "created the root-metadata-file's directory: " + storageMetadataDir.getAbsolutePath());
+
+ String storageRootDirPath;
+ if (storageRootDirName.startsWith(System.getProperty("file.separator"))) {
+ storageRootDirPath = new String(
+ mountPoints[i] + storageRootDirName.substring(System.getProperty("file.separator").length()));
+ } else {
+ storageRootDirPath = new String(mountPoints[i] + storageRootDirName);
+ }
+
+ LocalResource rootLocalResource = new LocalResource(STORAGE_LOCAL_RESOURCE_ID,
+ storageMetadataFile.getAbsolutePath(), 0, storageMetadataFile.getAbsolutePath(), 0,
+ storageRootDirPath);
+ insert(rootLocalResource);
+ LOGGER.log(Level.INFO, "created the root-metadata-file: " + storageMetadataFile.getAbsolutePath());
+ }
+ LOGGER.log(Level.INFO, "Completed the initialization of the local resource repository");
+ }
+
+ @Override
+ public LocalResource getResourceByPath(String path) throws HyracksDataException {
+ LocalResource resource = resourceCache.getIfPresent(path);
+ if (resource == null) {
+ File resourceFile = getLocalResourceFileByName(path);
+ if (resourceFile.exists()) {
+ resource = readLocalResource(resourceFile);
+ resourceCache.put(path, resource);
+ }
+ }
+ return resource;
+ }
+
+ @Override
+ public synchronized void insert(LocalResource resource) throws HyracksDataException {
+ File resourceFile = new File(getFileName(resource.getResourcePath(), resource.getResourceId()));
+ if (resourceFile.exists()) {
+ throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
+ } else {
+ resourceFile.getParentFile().mkdirs();
+ }
+
+ if (resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
+ resourceCache.put(resource.getResourcePath(), resource);
+ }
+
+ try (FileOutputStream fos = new FileOutputStream(resourceFile);
+ ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
+ oosToFos.writeObject(resource);
+ oosToFos.flush();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ //if replication enabled, send resource metadata info to remote nodes
+ if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
+ String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
+ createReplicationJob(ReplicationOperation.REPLICATE, filePath);
+ }
+ }
+
+ @Override
+ public synchronized void deleteResourceByPath(String resourcePath) throws HyracksDataException {
+ File resourceFile = getLocalResourceFileByName(resourcePath);
+ if (resourceFile.exists()) {
+ resourceFile.delete();
+ resourceCache.invalidate(resourcePath);
+
+ //if replication enabled, delete resource from remote replicas
+ if (isReplicationEnabled && !resourceFile.getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
+ createReplicationJob(ReplicationOperation.DELETE, resourceFile.getAbsolutePath());
+ }
+ } else {
+ throw new HyracksDataException("Resource doesn't exist");
+ }
+ }
+
+ private static File getLocalResourceFileByName(String resourcePath) {
+ return new File(resourcePath + File.separator + METADATA_FILE_NAME);
+ }
+
+ public HashMap<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
+ //TODO During recovery, the memory usage currently is proportional to the number of resources available.
+ //This could be fixed by traversing all resources on disk until the required resource is found.
+ HashMap<Long, LocalResource> resourcesMap = new HashMap<Long, LocalResource>();
+
+ for (int i = 0; i < mountPoints.length; i++) {
+ File storageRootDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
+ if (storageRootDir == null) {
+ continue;
+ }
+
+ //load all local resources.
+ File[] partitions = storageRootDir.listFiles();
+ for (File partition : partitions) {
+ File[] dataverseFileList = partition.listFiles();
+ if (dataverseFileList != null) {
+ for (File dataverseFile : dataverseFileList) {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ if (indexFile.isDirectory()) {
+ File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+ if (metadataFiles != null) {
+ for (File metadataFile : metadataFiles) {
+ LocalResource localResource = readLocalResource(metadataFile);
+ resourcesMap.put(localResource.getResourceId(), localResource);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return resourcesMap;
+ }
+
+ @Override
+ public long getMaxResourceID() throws HyracksDataException {
+ long maxResourceId = 0;
+
+ for (int i = 0; i < mountPoints.length; i++) {
+ File storageRootDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
+ if (storageRootDir == null) {
+ continue;
+ }
+
+ //load all local resources.
+ File[] partitions = storageRootDir.listFiles();
+ for (File partition : partitions) {
+ //traverse all local resources.
+ File[] dataverseFileList = partition.listFiles();
+ if (dataverseFileList != null) {
+ for (File dataverseFile : dataverseFileList) {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ if (indexFile.isDirectory()) {
+ File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+ if (metadataFiles != null) {
+ for (File metadataFile : metadataFiles) {
+ LocalResource localResource = readLocalResource(metadataFile);
+ maxResourceId = Math.max(maxResourceId, localResource.getResourceId());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return maxResourceId;
+ }
+
+ private static String getFileName(String baseDir, long resourceId) {
+ if (resourceId == STORAGE_LOCAL_RESOURCE_ID) {
+ return baseDir;
+ } else {
+ if (!baseDir.endsWith(System.getProperty("file.separator"))) {
+ baseDir += System.getProperty("file.separator");
+ }
+ return new String(baseDir + METADATA_FILE_NAME);
+ }
+ }
+
+ public static LocalResource readLocalResource(File file) throws HyracksDataException {
+ try (FileInputStream fis = new FileInputStream(file);
+ ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
+ LocalResource resource = (LocalResource) oisFromFis.readObject();
+ return resource;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private static final FilenameFilter METADATA_FILES_FILTER = new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
+
+ public void setReplicationManager(IReplicationManager replicationManager) {
+ this.replicationManager = replicationManager;
+ isReplicationEnabled = replicationManager.isReplicationEnabled();
+
+ if (isReplicationEnabled) {
+ filesToBeReplicated = new HashSet<String>();
+ nodeInactivePartitions = ConcurrentHashMap.newKeySet();
+ }
+ }
+
+ private void createReplicationJob(ReplicationOperation operation, String filePath) throws HyracksDataException {
- filesToBeReplicated.clear();
- filesToBeReplicated.add(filePath);
- AsterixReplicationJob job = new AsterixReplicationJob(ReplicationJobType.METADATA, operation,
- ReplicationExecutionType.SYNC, filesToBeReplicated);
- try {
- replicationManager.submitJob(job);
- } catch (IOException e) {
- throw new HyracksDataException(e);
++ /**
++ * Durable resources path format:
++ * /partition/dataverse/idx/fileName
++ * Temporary resources path format:
++ * /partition/TEMP_DATASETS_STORAGE_FOLDER/dataverse/idx/fileName
++ */
++ String[] fileNameTokens = filePath.split(File.separator);
++ String partitionDir = fileNameTokens[fileNameTokens.length - 4];
++ //exclude temporary datasets resources
++ if (!partitionDir.equals(StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER)) {
++ filesToBeReplicated.clear();
++ filesToBeReplicated.add(filePath);
++ AsterixReplicationJob job = new AsterixReplicationJob(ReplicationJobType.METADATA, operation,
++ ReplicationExecutionType.SYNC, filesToBeReplicated);
++ try {
++ replicationManager.submitJob(job);
++ } catch (IOException e) {
++ throw new HyracksDataException(e);
++ }
+ }
+ }
+
+ public String[] getStorageMountingPoints() {
+ return mountPoints;
+ }
+
+ /**
+ * Deletes physical files of all data verses.
+ *
+ * @param deleteStorageMetadata
+ * @throws IOException
+ */
+ public void deleteStorageData(boolean deleteStorageMetadata) throws IOException {
+ for (int i = 0; i < mountPoints.length; i++) {
+ File storageDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
+ if (storageDir != null) {
+ if (storageDir.isDirectory()) {
+ FileUtils.deleteDirectory(storageDir);
+ }
+ }
+
+ if (deleteStorageMetadata) {
+ //delete the metadata root directory
+ File storageMetadataFile = getStorageMetadataFile(mountPoints[i], nodeId, i);
+ File storageMetadataDir = getStorageMetadataBaseDir(storageMetadataFile);
+ if (storageMetadataDir.exists() && storageMetadataDir.isDirectory()) {
+ FileUtils.deleteDirectory(storageMetadataDir);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param mountPoint
+ * @param nodeId
+ * @param ioDeviceId
+ * @return A file reference to the storage metadata file.
+ */
+ private static File getStorageMetadataFile(String mountPoint, String nodeId, int ioDeviceId) {
+ String storageMetadataFileName = getStorageMetadataDirPath(mountPoint, nodeId, ioDeviceId) + File.separator
+ + STORAGE_METADATA_FILE_NAME_PREFIX;
+ File storageMetadataFile = new File(storageMetadataFileName);
+ return storageMetadataFile;
+ }
+
+ /**
+ * @param mountPoint
+ * @param nodeId
+ * @param ioDeviceId
+ * @return A file reference to the storage root directory if exists, otherwise null.
+ * @throws HyracksDataException
+ */
+ public static File getStorageRootDirectoryIfExists(String mountPoint, String nodeId, int ioDeviceId)
+ throws HyracksDataException {
+ File storageRootDir = null;
+ File storageMetadataFile = getStorageMetadataFile(mountPoint, nodeId, ioDeviceId);
+ if (storageMetadataFile.exists()) {
+ LocalResource rootLocalResource = readLocalResource(storageMetadataFile);
+ String storageRootDirPath = (String) rootLocalResource.getResourceObject();
+ Path path = Paths.get(storageRootDirPath);
+ if (Files.exists(path)) {
+ storageRootDir = new File(storageRootDirPath);
+ }
+ }
+ return storageRootDir;
+ }
+
+ /**
+ * @param partition
+ * @return The partition local path on this NC.
+ */
+ public String getPartitionPath(int partition) {
+ //currently each partition is replicated on the same IO device number on all NCs.
+ return mountPoints[getIODeviceNum(partition)];
+ }
+
+ public int getIODeviceNum(int partition) {
+ return clusterPartitions.get(partition).getIODeviceNum();
+ }
+
+ public Set<Integer> getActivePartitions() {
+ return Collections.unmodifiableSet(nodeActivePartitions);
+ }
+
+ public Set<Integer> getInactivePartitions() {
+ return Collections.unmodifiableSet(nodeInactivePartitions);
+ }
+
+ public Set<Integer> getNodeOrignalPartitions() {
+ return Collections.unmodifiableSet(nodeOriginalPartitions);
+ }
+
+ public synchronized void addActivePartition(int partitonId) {
+ nodeActivePartitions.add(partitonId);
+ nodeInactivePartitions.remove(partitonId);
+ }
+
+ public synchronized void addInactivePartition(int partitonId) {
+ nodeInactivePartitions.add(partitonId);
+ nodeActivePartitions.remove(partitonId);
+ }
+
+ /**
+ * @param resourceAbsolutePath
+ * @return the resource relative path starting from the partition directory
+ */
+ public static String getResourceRelativePath(String resourceAbsolutePath) {
+ String[] tokens = resourceAbsolutePath.split(File.separator);
- //partiton/dataverse/idx/fileName
++ //partition/dataverse/idx/fileName
+ return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
+ + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
+ }
+
+ public static int getResourcePartition(String resourceAbsolutePath) {
+ String[] tokens = resourceAbsolutePath.split(File.separator);
- //partiton/dataverse/idx/fileName
- return StoragePathUtil.getPartitonNumFromName(tokens[tokens.length - 4]);
++ //partition/dataverse/idx/fileName
++ return StoragePathUtil.getPartitionNumFromName(tokens[tokens.length - 4]);
+ }
+}