You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2019/10/10 23:05:36 UTC

[GitHub] [helix] jiajunwang commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

jiajunwang commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r333757470
 
 

 ##########
 File path: helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##########
 @@ -85,103 +83,117 @@ public Object deserialize(byte[] data) throws ZkMarshallingError {
         return data;
       }
     });
-    _zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-    // TODO: Optimize serialization with Jackson
-    // TODO: Or use a better binary serialization protocol
-    // TODO: Consider making this also binary
-    // TODO: Consider an async write for the metadata as well
-    _znRecordClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-    _znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-    _znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+    _zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
     _zkSerializer = new ZNRecordJacksonSerializer();
     _bucketSize = bucketSize;
-    _numVersions = numVersions;
   }
 
   /**
    * Constructor that uses a default bucket size.
    * @param zkAddr
    */
   public ZkBucketDataAccessor(String zkAddr) {
-    this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+    this(zkAddr, DEFAULT_BUCKET_SIZE);
   }
 
   @Override
   public <T extends HelixProperty> boolean compressedBucketWrite(String path, T value)
       throws IOException {
+    AtomicLong versionRef = new AtomicLong();
+    DataUpdater<byte[]> lastWriteVersionUpdater = dataInZk -> {
+      if (dataInZk == null || dataInZk.length == 0) {
+        // No last write version exists, so start with 0
+        return Longs.toByteArray(0);
+      }
+      // Last write exists, so increment and write it back
+      long lastWriteVersion = Longs.fromByteArray(dataInZk);
+      lastWriteVersion++;
+      // Set the AtomicReference
+      versionRef.set(lastWriteVersion);
+      return Longs.toByteArray(lastWriteVersion);
+    };
+
+    // 1. Increment lastWriteVersion using DataUpdater
+    if (!_zkBaseDataAccessor.update(path + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater,
 
 Review comment:
   Can we try to use ZkBaseDataAccessor.doUpdate() so you get the new updated value for free?
   Then you don't need versionRef.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org