You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2020/05/14 00:06:12 UTC

[asterixdb] 04/20: [NO ISSUE][MTD] Customizable dataset compaction policy in metadata

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit c586d8897a971bf6ad9fa04e9b60aada37b9c247
Author: Dmitry Lychagin <dm...@couchbase.com>
AuthorDate: Fri Apr 10 14:14:59 2020 -0700

    [NO ISSUE][MTD] Customizable dataset compaction policy in metadata
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Allow product extensions to customize how dataset's
      compaction policy is stored in the metadata
    
    Change-Id: I0216af5eabdf5ff269ba2d3feccf1371d273315b
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5224
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Dmitry Lychagin <dm...@couchbase.com>
    Reviewed-by: Hussain Towaileb <hu...@gmail.com>
---
 .../DatasetTupleTranslator.java                    | 107 +++++++++++++--------
 1 file changed, 65 insertions(+), 42 deletions(-)

diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index dae6152..74f5076 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -25,6 +25,7 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -68,6 +69,7 @@ import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.compression.CompressionManager;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -135,20 +137,9 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
         String nodeGroupName =
                 ((AString) datasetRecord.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_GROUPNAME_FIELD_INDEX))
                         .getStringValue();
-        String compactionPolicy = ((AString) datasetRecord
-                .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX)).getStringValue();
-        IACursor cursor = ((AOrderedList) datasetRecord
-                .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX))
-                        .getCursor();
-        Map<String, String> compactionPolicyProperties = new LinkedHashMap<>();
-        String key;
-        String value;
-        while (cursor.next()) {
-            ARecord field = (ARecord) cursor.get();
-            key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
-            value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
-            compactionPolicyProperties.put(key, value);
-        }
+
+        Pair<String, Map<String, String>> compactionPolicy = readCompactionPolicy(datasetType, datasetRecord);
+
         switch (datasetType) {
             case INTERNAL: {
                 ARecord datasetDetailsRecord = (ARecord) datasetRecord
@@ -159,7 +150,7 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
                 PartitioningStrategy partitioningStrategy = PartitioningStrategy.valueOf(((AString) datasetDetailsRecord
                         .getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PARTITIONSTRATEGY_FIELD_INDEX))
                                 .getStringValue());
-                cursor = ((AOrderedList) datasetDetailsRecord
+                IACursor cursor = ((AOrderedList) datasetDetailsRecord
                         .getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PARTITIONKEY_FIELD_INDEX))
                                 .getCursor();
                 List<List<String>> partitioningKey = new ArrayList<>();
@@ -220,15 +211,15 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
                 String adapter = ((AString) datasetDetailsRecord
                         .getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_DATASOURCE_ADAPTER_FIELD_INDEX))
                                 .getStringValue();
-                cursor = ((AOrderedList) datasetDetailsRecord
+                IACursor cursor = ((AOrderedList) datasetDetailsRecord
                         .getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX))
                                 .getCursor();
                 Map<String, String> properties = new HashMap<>();
                 while (cursor.next()) {
                     ARecord field = (ARecord) cursor.get();
-                    key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
+                    String key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
                             .getStringValue();
-                    value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
+                    String value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
                             .getStringValue();
                     properties.put(key, value);
                 }
@@ -262,10 +253,34 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
         String compressionScheme = getCompressionScheme(datasetRecord);
 
         return new Dataset(dataverseName, datasetName, typeDataverseName, typeName, metaTypeDataverseName, metaTypeName,
-                nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType,
+                nodeGroupName, compactionPolicy.first, compactionPolicy.second, datasetDetails, hints, datasetType,
                 datasetId, pendingOp, rebalanceCount, compressionScheme);
     }
 
+    protected Pair<String, Map<String, String>> readCompactionPolicy(DatasetType datasetType, ARecord datasetRecord) {
+
+        String compactionPolicy = ((AString) datasetRecord
+                .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX)).getStringValue();
+        AOrderedList compactionPolicyPropertiesList = ((AOrderedList) datasetRecord
+                .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX));
+
+        Map<String, String> compactionPolicyProperties;
+        if (compactionPolicyPropertiesList.size() > 0) {
+            compactionPolicyProperties = new LinkedHashMap<>();
+            for (IACursor cursor = compactionPolicyPropertiesList.getCursor(); cursor.next();) {
+                ARecord field = (ARecord) cursor.get();
+                String key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
+                        .getStringValue();
+                String value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
+                        .getStringValue();
+                compactionPolicyProperties.put(key, value);
+            }
+        } else {
+            compactionPolicyProperties = Collections.emptyMap();
+        }
+        return new Pair<>(compactionPolicy, compactionPolicyProperties);
+    }
+
     private long getRebalanceCount(ARecord datasetRecord) {
         // Read the rebalance count if there is one.
         int rebalanceCountIndex =
@@ -342,29 +357,9 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_GROUPNAME_FIELD_INDEX, fieldValue);
 
-        // write field 6
-        fieldValue.reset();
-        aString.setValue(dataset.getCompactionPolicy());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX, fieldValue);
-
-        // write field 7
-        listBuilder.reset((AOrderedListType) MetadataRecordTypes.DATASET_RECORDTYPE
-                .getFieldTypes()[MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX]);
-        if (dataset.getCompactionPolicyProperties() != null) {
-            for (Map.Entry<String, String> property : dataset.getCompactionPolicyProperties().entrySet()) {
-                String name = property.getKey();
-                String value = property.getValue();
-                itemValue.reset();
-                DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(),
-                        MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE);
-                listBuilder.addItem(itemValue);
-            }
-        }
-        fieldValue.reset();
-        listBuilder.write(fieldValue.getDataOutput(), true);
-        recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX,
-                fieldValue);
+        // write field 6/7
+        writeCompactionPolicy(dataset.getDatasetType(), dataset.getCompactionPolicy(),
+                dataset.getCompactionPolicyProperties(), listBuilder, itemValue);
 
         // write field 8/9
         fieldValue.reset();
@@ -414,6 +409,34 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
         return tuple;
     }
 
+    protected void writeCompactionPolicy(DatasetType datasetType, String compactionPolicy,
+            Map<String, String> compactionPolicyProperties, OrderedListBuilder listBuilder,
+            ArrayBackedValueStorage itemValue) throws HyracksDataException {
+        // write field 6
+        fieldValue.reset();
+        aString.setValue(compactionPolicy);
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX, fieldValue);
+
+        // write field 7
+        listBuilder.reset((AOrderedListType) MetadataRecordTypes.DATASET_RECORDTYPE
+                .getFieldTypes()[MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX]);
+        if (compactionPolicyProperties != null && !compactionPolicyProperties.isEmpty()) {
+            for (Map.Entry<String, String> property : compactionPolicyProperties.entrySet()) {
+                String name = property.getKey();
+                String value = property.getValue();
+                itemValue.reset();
+                DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(),
+                        MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE);
+                listBuilder.addItem(itemValue);
+            }
+        }
+        fieldValue.reset();
+        listBuilder.write(fieldValue.getDataOutput(), true);
+        recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX,
+                fieldValue);
+    }
+
     /**
      * Keep protected to allow other extensions to add additional fields
      *