You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by "Marcosrico (via GitHub)" <gi...@apache.org> on 2023/05/30 22:30:16 UTC

[GitHub] [helix] Marcosrico opened a new pull request, #2515: Distributed Semaphore Implementation

Marcosrico opened a new pull request, #2515:
URL: https://github.com/apache/helix/pull/2515

   ### Issues
   
   - [ ] My PR addresses the following Helix issues and references them in the PR description:
   
   (#200 - Link your issue number here: You can write "Fixes #XXX". Please use the proper keyword so that the issue gets closed automatically. See https://docs.github.com/en/github/managing-your-work-on-github/linking-a-pull-request-to-an-issue
   Any of the following keywords can be used: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved)
   
   ### Description
   
   - [ ] Here are some details about my PR, including screenshots of any UI changes:
   
   (Write a concise description including what, why, how)
   
   ### Tests
   
   - [ ] The following tests are written for this issue:
   
   (List the names of added unit/integration tests)
   
   - The following is the result of the "mvn test" command on the appropriate module:
   
   (If CI test fails due to known issue, please specify the issue and test PR locally. Then copy & paste the result of "mvn test" to here.)
   
   ### Changes that Break Backward Compatibility (Optional)
   
   - My PR contains changes that break backward compatibility or previous assumptions for certain methods or API. They include:
   
   (Consider including all behavior changes for public methods or API. Also include these changes in merge description so that other developers are aware of these changes. This allows them to make relevant code changes in feature branches accounting for the new method/API behavior.)
   
   ### Documentation (Optional)
   
   - In case of new functionality, my PR adds documentation in the following wiki page:
   
   (Link the GitHub wiki you added)
   
   ### Commits
   
   - My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1228631420


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -75,8 +133,19 @@ public Permit acquire() {
    * @param count number of permits to acquire
    * @return a collection of permits
    */
-  public Collection<Permit> acquire(int count) {
-    throw new NotImplementedException("Not implemented yet.");
+  public synchronized Collection<Permit> acquire(int count) {
+    if (getRemainingCapacity() < count) {
+      LOG.warn("No sufficient permits available. Attempt to acquire {} permits, but only {} permits available", count,
+          getRemainingCapacity());
+      return null;
+    } else {
+      updateAcquirePermit(count);

Review Comment:
   After an offline discussion we agreed that it was necessary to implement some sort of anti race condition that prevents two clients from modifying a semaphore at the same time / in between processes. We came up with multiple solutions but ultimately agreed that we would check for the available capacity in the update call rather than having it separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1214642055


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private final String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
+
+  public DistributedSemaphore(MetaClientConfig config, String path, int capacity) {
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
+
+    _path = path;
+    if (_metaClient.exists(path) == null) {
+      DataRecord dataRecord = new DataRecord(path);
+      dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+      dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+      _metaClient.create(path, dataRecord);
+    }

Review Comment:
   If there is already a entry on that path, then constructor still succeed but no actual entry is created. We should differentiate these 2 cases.
   It might be a problem for consumer if the semaphore is not created. Then  consumer will create one semaphore which it should not be creating.
   
   Another thing to consider is that, both consumer and producer is going to use this client to create semaphore and acquire permit. So do we want to have entry creation integrated in the constructor?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1227211115


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -96,30 +165,40 @@ public Collection<Permit> acquire(int count, long timeout, TimeUnit unit) {
    * @return remaining capacity
    */
   public long getRemainingCapacity() {
-    throw new NotImplementedException("Not implemented yet.");
+    return getSemaphore().getLongField(REMAINING_CAPACITY_NAME, DEFAULT_REMAINING_CAPACITY);
   }
 
   /**
    * Get the semaphore data record
    * @return semaphore data record
    */
   private DataRecord getSemaphore() {
-    throw new NotImplementedException("Not implemented yet.");
+    if (_metaClient.exists(_path) == null) {
+      throw new MetaClientException("Semaphore does not exist at path: " + _path + ". Please create it first.");
+    }
+    return new DataRecord(_metaClient.get(_path));
   }
 
   /**
    * Return a permit. If the permit is already returned, log and return void.
    */
-  public void returnPermit(Permit permit) {
-    throw new NotImplementedException("Not implemented yet.");
+  public synchronized void returnPermit(Permit permit) {

Review Comment:
   Why we need synchronized?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1214642055


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private final String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
+
+  public DistributedSemaphore(MetaClientConfig config, String path, int capacity) {
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
+
+    _path = path;
+    if (_metaClient.exists(path) == null) {
+      DataRecord dataRecord = new DataRecord(path);
+      dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+      dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+      _metaClient.create(path, dataRecord);
+    }

Review Comment:
   If there is already a entry on that path, then constructor still succeed but no actual entry is created. I feel like we do not need to check "_metaClient.exists(path)". Just create the node, if it fails then throw exception in constructor. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1234516858


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -22,26 +22,56 @@
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
 
   /**
    * Create a distributed semaphore client with the given configuration.
    * @param config configuration of the client
    */
   public DistributedSemaphore(MetaClientConfig config) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (config == null) {
+      throw new MetaClientException("Configuration cannot be null");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
   }
 
   /**
    * Connect to an existing distributed semaphore client.
    * @param client client to connect to
    */
   public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (client == null) {
+      throw new MetaClientException("Client cannot be null");
+    }
+    _metaClient = client;
+    _metaClient.connect();

Review Comment:
   That is a good point. The only concern I have is wether there is any case other than the client being already created that would throw an error? If not, then I agree we should catch it (would have to make the change for lockclient too).
   Looking into the code there seems to be a method in native zkclient "isConnectionClosed()" but not implemented in zkMetaClient. If we import that method we can call it to check whether the client is already connected or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on PR #2515:
URL: https://github.com/apache/helix/pull/2515#issuecomment-1601125587

   LGTM. Good PR!
   Please address the exception related comments in following change. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rahulrane50 commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "rahulrane50 (via GitHub)" <gi...@apache.org>.
rahulrane50 commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1237649061


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -75,8 +133,19 @@ public Permit acquire() {
    * @param count number of permits to acquire
    * @return a collection of permits
    */
-  public Collection<Permit> acquire(int count) {
-    throw new NotImplementedException("Not implemented yet.");
+  public synchronized Collection<Permit> acquire(int count) {
+    if (getRemainingCapacity() < count) {
+      LOG.warn("No sufficient permits available. Attempt to acquire {} permits, but only {} permits available", count,
+          getRemainingCapacity());
+      return null;
+    } else {
+      updateAcquirePermit(count);

Review Comment:
   I meant different count values for acquire.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1214642055


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private final String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
+
+  public DistributedSemaphore(MetaClientConfig config, String path, int capacity) {
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
+
+    _path = path;
+    if (_metaClient.exists(path) == null) {
+      DataRecord dataRecord = new DataRecord(path);
+      dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+      dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+      _metaClient.create(path, dataRecord);
+    }

Review Comment:
   If there is already a entry on that path, then constructor still succeed but no actual entry is created. We should differentiate these 2 cases.
   
   Another thing to consider is that, both consumer and producer is going to use this client to create semaphore and acquire permit. So do we want to have entry creation integrated in the constructor?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1230022991


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -50,23 +80,51 @@ public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
    * @param capacity capacity of the semaphore
    */
   public void createSemaphore(String path, int capacity) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (capacity <= 0) {
+      throw new MetaClientException("Capacity must be positive");
+    }
+    if (path == null || path.isEmpty()) {
+      throw new MetaClientException("Invalid path to create semaphore");
+    }
+    if (_metaClient.exists(path) != null) {
+      throw new MetaClientException("Semaphore already exists");
+    }
+    _path = path;
+    if (_metaClient.exists(path) == null) {
+      DataRecord dataRecord = new DataRecord(path);
+      dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+      dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+      _metaClient.create(path, dataRecord);
+    }
   }
 
   /**
    * Connect to an existing distributed semaphore.
    * @param path path of the semaphore
    */
   public void connectSemaphore(String path) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (path == null || path.isEmpty()) {
+      throw new MetaClientException("Invalid path to connect semaphore");
+    }
+    if (_metaClient.exists(path) == null) {
+      throw new MetaClientException("Semaphore does not exist");
+    }
+    _path = path;
   }
 
   /**
    * Acquire a permit. If no permit is available, log error and return null.
    * @return a permit
    */
   public Permit acquire() {
-    throw new NotImplementedException("Not implemented yet.");
+    int count = 1;

Review Comment:
   Can be inline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1213816081


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private final String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
+
+  public DistributedSemaphore(MetaClientConfig config, String path, int capacity) {
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
+
+    _path = path;
+    if (_metaClient.exists(path) == null) {
+      DataRecord dataRecord = new DataRecord(path);
+      dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+      dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+      _metaClient.create(path, dataRecord);
+    }
+  }
+
+  public DistributedSemaphore(MetaClientInterface<DataRecord> client, String path) {
+    _metaClient = client;
+    _path = path;
+    _metaClient.connect();
+  }
+
+  /**
+   * Acquire a permit. If no permit is available, block until when it was able to acquire.
+   * @return a permit
+   */
+  public Permit acquire() {
+    if (getRemainingCapacity() > 0) {
+      updateAcquirePermit(1);
+      return retrievePermit(_path);
+    } else {
+      throw new MetaClientException("No sufficient permits available");
+    }
+  }
+
+
+  /**
+   * Try to acquire a permit. If not enough permit is available, wait for a specific time or return when it was able to acquire.
+   * @param count number of permits to acquire
+   * @return a collection of permits
+   */
+  public Collection<Permit> acquire(int count) {
+    if (getRemainingCapacity() < count) {
+      throw new MetaClientException("No sufficient permits available");
+    } else {
+      updateAcquirePermit(count);
+      Collection<Permit> permits = new ArrayList<>();
+      for (int i = 0; i < count; i++) {
+        permits.add(retrievePermit(_path));
+      }
+      return permits;
+    }
+  }
+
+  /**
+   * Try to acquire a permit. If no enough permit is available, wait for a specific time or return when it was able to acquire.
+   * If timeout <=0, then return immediately when not able to acquire.
+   * @param count number of permits to acquire
+   * @param timeout time to wait
+   * @param unit time unit
+   * @return a collection of permits
+   */
+  public Collection<Permit> tryAcquire(int count, long timeout, TimeUnit unit) {
+    throw new MetaClientException("Not implemented yet");

Review Comment:
   We should have a `Not implemented` exception type.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1240493807


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -75,8 +133,19 @@ public Permit acquire() {
    * @param count number of permits to acquire
    * @return a collection of permits
    */
-  public Collection<Permit> acquire(int count) {
-    throw new NotImplementedException("Not implemented yet.");
+  public synchronized Collection<Permit> acquire(int count) {
+    if (getRemainingCapacity() < count) {
+      LOG.warn("No sufficient permits available. Attempt to acquire {} permits, but only {} permits available", count,
+          getRemainingCapacity());
+      return null;
+    } else {
+      updateAcquirePermit(count);

Review Comment:
   We had an discussion offline. The client won't store the number of left permit in memory. Client uses `update` to update remaining permit on ZK, so there won't be race condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1234393323


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -22,26 +22,56 @@
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
 
   /**
    * Create a distributed semaphore client with the given configuration.
    * @param config configuration of the client
    */
   public DistributedSemaphore(MetaClientConfig config) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (config == null) {
+      throw new MetaClientException("Configuration cannot be null");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
   }
 
   /**
    * Connect to an existing distributed semaphore client.
    * @param client client to connect to
    */
   public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (client == null) {
+      throw new MetaClientException("Client cannot be null");
+    }
+    _metaClient = client;
+    _metaClient.connect();

Review Comment:
   If we connect an already connected client it will throw an exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1234516858


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -22,26 +22,56 @@
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
 
   /**
    * Create a distributed semaphore client with the given configuration.
    * @param config configuration of the client
    */
   public DistributedSemaphore(MetaClientConfig config) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (config == null) {
+      throw new MetaClientException("Configuration cannot be null");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
   }
 
   /**
    * Connect to an existing distributed semaphore client.
    * @param client client to connect to
    */
   public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (client == null) {
+      throw new MetaClientException("Client cannot be null");
+    }
+    _metaClient = client;
+    _metaClient.connect();

Review Comment:
   That is a good point. The only concern I have is wether there is any case other than the client being already created that would throw an error? If not, then I agree we should catch it (would have to make the change for lockclient too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1213816608


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private final String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
+
+  public DistributedSemaphore(MetaClientConfig config, String path, int capacity) {
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
+
+    _path = path;
+    if (_metaClient.exists(path) == null) {
+      DataRecord dataRecord = new DataRecord(path);
+      dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+      dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+      _metaClient.create(path, dataRecord);
+    }
+  }
+
+  public DistributedSemaphore(MetaClientInterface<DataRecord> client, String path) {
+    _metaClient = client;
+    _path = path;
+    _metaClient.connect();
+  }
+
+  /**
+   * Acquire a permit. If no permit is available, block until when it was able to acquire.
+   * @return a permit
+   */
+  public Permit acquire() {
+    if (getRemainingCapacity() > 0) {
+      updateAcquirePermit(1);
+      return retrievePermit(_path);
+    } else {
+      throw new MetaClientException("No sufficient permits available");
+    }
+  }
+
+
+  /**
+   * Try to acquire a permit. If not enough permit is available, wait for a specific time or return when it was able to acquire.
+   * @param count number of permits to acquire
+   * @return a collection of permits
+   */
+  public Collection<Permit> acquire(int count) {
+    if (getRemainingCapacity() < count) {
+      throw new MetaClientException("No sufficient permits available");
+    } else {
+      updateAcquirePermit(count);
+      Collection<Permit> permits = new ArrayList<>();
+      for (int i = 0; i < count; i++) {
+        permits.add(retrievePermit(_path));
+      }
+      return permits;
+    }
+  }
+
+  /**
+   * Try to acquire a permit. If no enough permit is available, wait for a specific time or return when it was able to acquire.

Review Comment:
   Oh i see, then I must have gotten both acquire and tryAcquire mixed up. Should acquire then be blocking?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1232782943


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -22,26 +22,56 @@
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
 
   /**
    * Create a distributed semaphore client with the given configuration.
    * @param config configuration of the client
    */
   public DistributedSemaphore(MetaClientConfig config) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (config == null) {
+      throw new MetaClientException("Configuration cannot be null");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
   }
 
   /**
    * Connect to an existing distributed semaphore client.
    * @param client client to connect to
    */
   public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (client == null) {
+      throw new MetaClientException("Client cannot be null");
+    }
+    _metaClient = client;
+    _metaClient.connect();

Review Comment:
   What if the client is already connected?  Right now `connect()` in zkMetaClient is not re-entrant 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1228666731


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -96,30 +165,40 @@ public Collection<Permit> acquire(int count, long timeout, TimeUnit unit) {
    * @return remaining capacity
    */
   public long getRemainingCapacity() {
-    throw new NotImplementedException("Not implemented yet.");
+    return getSemaphore().getLongField(REMAINING_CAPACITY_NAME, DEFAULT_REMAINING_CAPACITY);
   }
 
   /**
    * Get the semaphore data record
    * @return semaphore data record
    */
   private DataRecord getSemaphore() {
-    throw new NotImplementedException("Not implemented yet.");
+    if (_metaClient.exists(_path) == null) {
+      throw new MetaClientException("Semaphore does not exist at path: " + _path + ". Please create it first.");
+    }
+    return new DataRecord(_metaClient.get(_path));
   }
 
   /**
    * Return a permit. If the permit is already returned, log and return void.
    */
-  public void returnPermit(Permit permit) {
-    throw new NotImplementedException("Not implemented yet.");
+  public synchronized void returnPermit(Permit permit) {

Review Comment:
   Good catch, we don't. Synchronization addressed above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1235654497


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -22,26 +22,56 @@
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
 
   /**
    * Create a distributed semaphore client with the given configuration.
    * @param config configuration of the client
    */
   public DistributedSemaphore(MetaClientConfig config) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (config == null) {
+      throw new MetaClientException("Configuration cannot be null");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
   }
 
   /**
    * Connect to an existing distributed semaphore client.
    * @param client client to connect to
    */
   public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (client == null) {
+      throw new MetaClientException("Client cannot be null");
+    }
+    _metaClient = client;
+    _metaClient.connect();

Review Comment:
   Checked the code, there is only one exception thrown for existing client "illegalstateexception" so I caught that. Should be good now. Thanks for the check!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1234515614


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -22,26 +22,56 @@
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
 
   /**
    * Create a distributed semaphore client with the given configuration.
    * @param config configuration of the client
    */
   public DistributedSemaphore(MetaClientConfig config) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (config == null) {
+      throw new MetaClientException("Configuration cannot be null");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
   }
 
   /**
    * Connect to an existing distributed semaphore client.
    * @param client client to connect to
    */
   public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (client == null) {
+      throw new MetaClientException("Client cannot be null");
+    }
+    _metaClient = client;
+    _metaClient.connect();

Review Comment:
   If the connection is already established, then constructor throws exception and return. User won't be able to create using existing connected metaclient. 
   Do you think we can catch exception and continue? It will allow user to reuse an existing client. Otherwise there is no need to provide this constructor. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1232782943


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -22,26 +22,56 @@
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
 
   /**
    * Create a distributed semaphore client with the given configuration.
    * @param config configuration of the client
    */
   public DistributedSemaphore(MetaClientConfig config) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (config == null) {
+      throw new MetaClientException("Configuration cannot be null");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
   }
 
   /**
    * Connect to an existing distributed semaphore client.
    * @param client client to connect to
    */
   public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (client == null) {
+      throw new MetaClientException("Client cannot be null");
+    }
+    _metaClient = client;
+    _metaClient.connect();

Review Comment:
   What is the client is already connected?  Right now `connect()` in zkMetaClient is not re-entrant 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1213815975


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private final String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
+
+  public DistributedSemaphore(MetaClientConfig config, String path, int capacity) {
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
+
+    _path = path;
+    if (_metaClient.exists(path) == null) {
+      DataRecord dataRecord = new DataRecord(path);
+      dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+      dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+      _metaClient.create(path, dataRecord);
+    }
+  }
+
+  public DistributedSemaphore(MetaClientInterface<DataRecord> client, String path) {
+    _metaClient = client;
+    _path = path;
+    _metaClient.connect();
+  }
+
+  /**
+   * Acquire a permit. If no permit is available, block until when it was able to acquire.
+   * @return a permit
+   */
+  public Permit acquire() {
+    if (getRemainingCapacity() > 0) {
+      updateAcquirePermit(1);
+      return retrievePermit(_path);
+    } else {
+      throw new MetaClientException("No sufficient permits available");
+    }
+  }
+
+
+  /**
+   * Try to acquire a permit. If not enough permit is available, wait for a specific time or return when it was able to acquire.
+   * @param count number of permits to acquire
+   * @return a collection of permits
+   */
+  public Collection<Permit> acquire(int count) {
+    if (getRemainingCapacity() < count) {
+      throw new MetaClientException("No sufficient permits available");
+    } else {
+      updateAcquirePermit(count);
+      Collection<Permit> permits = new ArrayList<>();
+      for (int i = 0; i < count; i++) {
+        permits.add(retrievePermit(_path));
+      }
+      return permits;
+    }
+  }
+
+  /**
+   * Try to acquire a permit. If no enough permit is available, wait for a specific time or return when it was able to acquire.

Review Comment:
   tryAcquire should not be a blocking call -> meaning we dont wait but just return directly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1214763047


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private final String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
+
+  public DistributedSemaphore(MetaClientConfig config, String path, int capacity) {
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
+
+    _path = path;
+    if (_metaClient.exists(path) == null) {
+      DataRecord dataRecord = new DataRecord(path);
+      dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+      dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+      _metaClient.create(path, dataRecord);
+    }

Review Comment:
   Good point. After offline discussion, I will separate this PR into two, with another one being just api and permit. Thanks for the review!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1230309014


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -50,23 +80,51 @@ public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
    * @param capacity capacity of the semaphore
    */
   public void createSemaphore(String path, int capacity) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (capacity <= 0) {
+      throw new MetaClientException("Capacity must be positive");
+    }
+    if (path == null || path.isEmpty()) {
+      throw new MetaClientException("Invalid path to create semaphore");
+    }
+    if (_metaClient.exists(path) != null) {
+      throw new MetaClientException("Semaphore already exists");
+    }
+    _path = path;
+    if (_metaClient.exists(path) == null) {
+      DataRecord dataRecord = new DataRecord(path);
+      dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+      dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+      _metaClient.create(path, dataRecord);
+    }
   }
 
   /**
    * Connect to an existing distributed semaphore.
    * @param path path of the semaphore
    */
   public void connectSemaphore(String path) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (path == null || path.isEmpty()) {
+      throw new MetaClientException("Invalid path to connect semaphore");
+    }
+    if (_metaClient.exists(path) == null) {
+      throw new MetaClientException("Semaphore does not exist");
+    }
+    _path = path;
   }
 
   /**
    * Acquire a permit. If no permit is available, log error and return null.
    * @return a permit
    */
   public Permit acquire() {
-    throw new NotImplementedException("Not implemented yet.");
+    int count = 1;

Review Comment:
   Okay thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1239076560


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -50,23 +87,50 @@ public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
    * @param capacity capacity of the semaphore
    */
   public void createSemaphore(String path, int capacity) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (capacity <= 0) {
+      throw new MetaClientException("Capacity must be positive");
+    }
+    if (path == null || path.isEmpty()) {
+      throw new MetaClientException("Invalid path to create semaphore");
+    }
+    if (_metaClient.exists(path) != null) {
+      throw new MetaClientException("Semaphore already exists");
+    }
+    _path = path;

Review Comment:
   Correct I'll fix that, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1235818420


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java:
##########
@@ -64,8 +64,12 @@ public LockClient(MetaClientInterface<LockInfo> client) {
       throw new IllegalArgumentException("MetaClient cannot be null.");
     }
     _metaClient = client;
-    LOG.info("Connecting to existing MetaClient for LockClient");
-    _metaClient.connect();
+    try {
+      LOG.info("Connecting to existing MetaClient for LockClient");
+      _metaClient.connect();
+    } catch (IllegalStateException e) {
+      // Ignore as it either has already been connected or already been closed.

Review Comment:
   One question. If the client is already closed, what exception do metaclient throw? If it is also IllegalStateException, we should considering differentiate calling `connect` on already connected client or on already closed client. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu merged pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu merged PR #2515:
URL: https://github.com/apache/helix/pull/2515


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1213815438


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private final String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
+
+  public DistributedSemaphore(MetaClientConfig config, String path, int capacity) {
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.

Review Comment:
   The comment is outdated I guess? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1214665351


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private final String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
+
+  public DistributedSemaphore(MetaClientConfig config, String path, int capacity) {
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
+
+    _path = path;
+    if (_metaClient.exists(path) == null) {
+      DataRecord dataRecord = new DataRecord(path);
+      dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+      dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+      _metaClient.create(path, dataRecord);
+    }

Review Comment:
    We can think about having constructor only for connection, and a getSemaphore and createSemaphore functions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1237440805


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -75,8 +133,19 @@ public Permit acquire() {
    * @param count number of permits to acquire
    * @return a collection of permits
    */
-  public Collection<Permit> acquire(int count) {
-    throw new NotImplementedException("Not implemented yet.");
+  public synchronized Collection<Permit> acquire(int count) {
+    if (getRemainingCapacity() < count) {
+      LOG.warn("No sufficient permits available. Attempt to acquire {} permits, but only {} permits available", count,
+          getRemainingCapacity());
+      return null;
+    } else {
+      updateAcquirePermit(count);

Review Comment:
   Could you please elaborate a bit more on "caller may get different values". What does `value` refer to?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1237029704


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java:
##########
@@ -64,8 +64,12 @@ public LockClient(MetaClientInterface<LockInfo> client) {
       throw new IllegalArgumentException("MetaClient cannot be null.");
     }
     _metaClient = client;
-    LOG.info("Connecting to existing MetaClient for LockClient");
-    _metaClient.connect();
+    try {
+      LOG.info("Connecting to existing MetaClient for LockClient");
+      _metaClient.connect();
+    } catch (IllegalStateException e) {
+      // Ignore as it either has already been connected or already been closed.

Review Comment:
   Will create a todo for the next PR to address this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] rahulrane50 commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "rahulrane50 (via GitHub)" <gi...@apache.org>.
rahulrane50 commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1237409721


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -50,23 +87,50 @@ public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
    * @param capacity capacity of the semaphore
    */
   public void createSemaphore(String path, int capacity) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (capacity <= 0) {
+      throw new MetaClientException("Capacity must be positive");
+    }
+    if (path == null || path.isEmpty()) {
+      throw new MetaClientException("Invalid path to create semaphore");
+    }
+    if (_metaClient.exists(path) != null) {
+      throw new MetaClientException("Semaphore already exists");
+    }
+    _path = path;

Review Comment:
   Shouldn't we do this after line 104? What if creation of znode fails?



##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -75,8 +133,19 @@ public Permit acquire() {
    * @param count number of permits to acquire
    * @return a collection of permits
    */
-  public Collection<Permit> acquire(int count) {
-    throw new NotImplementedException("Not implemented yet.");
+  public synchronized Collection<Permit> acquire(int count) {
+    if (getRemainingCapacity() < count) {
+      LOG.warn("No sufficient permits available. Attempt to acquire {} permits, but only {} permits available", count,
+          getRemainingCapacity());
+      return null;
+    } else {
+      updateAcquirePermit(count);

Review Comment:
   I'm still trying to understand how current implementation is atomic? We check and update znode with remaining capacity in updateAcquirePermit (nit :i feel it should be renamed to checkAcquistionAndUpdatePermit or something) and then update in-memory object in retrievePermit() but it's not atomic right? There still could be race conditions here. For ex there are two acquire calls with different numbers then caller may get different values based on order of execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on PR #2515:
URL: https://github.com/apache/helix/pull/2515#issuecomment-1601301852

   PR approved by @xyuanlu 
   Commit message: Lattice MetaClient Distributed Semaphore Implementation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] Marcosrico commented on a diff in pull request #2515: Distributed Semaphore Implementation

Posted by "Marcosrico (via GitHub)" <gi...@apache.org>.
Marcosrico commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1234516858


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -22,26 +22,56 @@
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class);
 
   /**
    * Create a distributed semaphore client with the given configuration.
    * @param config configuration of the client
    */
   public DistributedSemaphore(MetaClientConfig config) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (config == null) {
+      throw new MetaClientException("Configuration cannot be null");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
   }
 
   /**
    * Connect to an existing distributed semaphore client.
    * @param client client to connect to
    */
   public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (client == null) {
+      throw new MetaClientException("Client cannot be null");
+    }
+    _metaClient = client;
+    _metaClient.connect();

Review Comment:
   That is a good point. The only concern I have is wether there is any case other than the client being already created that would throw an error? If not, then I agree we should catch it (would have to make the change for lockclient too).
   Looking into the code there seems to be a method in native zkclient "isClosed()" but not implemented in zkMetaClient. If we import that method we can call it to check whether the client is already connected or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org