You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2019/10/15 16:59:09 UTC

[GitHub] [helix] dasahcc commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

dasahcc commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r335069798
 
 

 ##########
 File path: helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##########
 @@ -85,103 +87,121 @@ public Object deserialize(byte[] data) throws ZkMarshallingError {
         return data;
       }
     });
-    _zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-    // TODO: Optimize serialization with Jackson
-    // TODO: Or use a better binary serialization protocol
-    // TODO: Consider making this also binary
-    // TODO: Consider an async write for the metadata as well
-    _znRecordClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-    _znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-    _znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+    _zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
     _zkSerializer = new ZNRecordJacksonSerializer();
     _bucketSize = bucketSize;
-    _numVersions = numVersions;
+    _versionTTL = versionTTL;
   }
 
   /**
    * Constructor that uses a default bucket size.
    * @param zkAddr
    */
   public ZkBucketDataAccessor(String zkAddr) {
-    this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+    this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
   @Override
   public <T extends HelixProperty> boolean compressedBucketWrite(String path, T value)
       throws IOException {
-    // Take the ZNrecord and serialize it (get byte[])
+    final long timestamp = System.currentTimeMillis();
+
+    DataUpdater<byte[]> lastWriteVersionUpdater = dataInZk -> {
+      if (dataInZk == null || dataInZk.length == 0) {
+        // No last write version exists, so start with 0
+        return ("0_" + timestamp).getBytes();
+      }
+      // Last write exists, so increment and write it back with a timestamp
+      String lastWriteVersionStr = new String(dataInZk);
+      long lastWriteVersion = extractVersion(lastWriteVersionStr);
+      lastWriteVersion++;
+      return (lastWriteVersion + "_" + timestamp).getBytes();
+    };
+
+    // 1. Increment lastWriteVersion using DataUpdater
+    ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor
+        .doUpdate(path + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, AccessOption.PERSISTENT);
+    if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
+      throw new HelixException(
+          String.format("Failed to write the write version at path: %s!", path));
+    }
+
+    // Successfully reserved a version number
+    byte[] binaryVersion = (byte[]) result._updatedValue;
+    final String timestampedVersion = new String(binaryVersion);
+    final long version = extractVersion(timestampedVersion);
+
+    // 2. Write to the incremented last write version with timestamp for TTL
+    String versionedDataPath = path + "/" + timestampedVersion;
+
+    // Take the ZNRecord and serialize it (get byte[])
     byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
     // Compress the byte[]
     byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
     // Compute N - number of buckets
     int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
 
-    if (tryLock(path)) {
-      try {
-        // Read or initialize metadata and compute the last success version index
-        ZNRecord metadataRecord =
-            _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-        if (metadataRecord == null) {
-          metadataRecord = new ZNRecord(extractIdFromPath(path));
-        }
-        int lastSuccessIndex =
-            (metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % _numVersions;
-        String dataPath = path + "/" + lastSuccessIndex;
-
-        List<String> paths = new ArrayList<>();
-        List<Object> buckets = new ArrayList<>();
-
-        int ptr = 0;
-        int counter = 0;
-        while (counter < numBuckets) {
-          paths.add(dataPath + "/" + counter);
-          if (counter == numBuckets - 1) {
-            // Special treatment for the last bucket
-            buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
-                ptr + compressedRecord.length % _bucketSize));
-          } else {
-            buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize));
-          }
-          ptr += _bucketSize;
-          counter++;
-        }
-
-        // Do a cleanup of previous data
-        if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
-          // Clean-up is not critical so upon failure, we log instead of throwing an exception
-          LOG.warn("Failed to clean up previous bucketed data in data path: {}", dataPath);
-        }
+    List<String> paths = new ArrayList<>();
+    List<byte[]> buckets = new ArrayList<>();
+
+    int ptr = 0;
+    int counter = 0;
+    while (counter < numBuckets) {
+      paths.add(versionedDataPath + "/" + counter);
+      if (counter == numBuckets - 1) {
+        // Special treatment for the last bucket
+        buckets.add(
+            Arrays.copyOfRange(compressedRecord, ptr, ptr + compressedRecord.length % _bucketSize));
+      } else {
+        buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize));
+      }
+      ptr += _bucketSize;
+      counter++;
+    }
 
-        // Do an async set to ZK
-        boolean[] success =
-            _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT);
-        // Exception and fail the write if any failed
-        for (boolean s : success) {
-          if (!s) {
-            throw new HelixException(
-                String.format("Failed to write the data buckets for path: %s", path));
-          }
-        }
+    // 3. Include the metadata in the batch write
+    Map<String, String> metadata = ImmutableMap.of(BUCKET_SIZE_KEY, Integer.toString(_bucketSize),
+        DATA_SIZE_KEY, Integer.toString(compressedRecord.length));
+    byte[] binaryMetadata = OBJECT_MAPPER.writeValueAsBytes(metadata);
+    paths.add(versionedDataPath + "/" + METADATA_KEY);
+    buckets.add(binaryMetadata);
+
+    // Do an async set to ZK
+    boolean[] success = _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT);
+    // Exception and fail the write if any failed
+    for (boolean s : success) {
+      if (!s) {
 
 Review comment:
   Once you failed the persist, shall we clean up the suceeded ones?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org