You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@zookeeper.apache.org by dong ma <ma...@gmail.com> on 2020/05/14 10:06:18 UTC

zookeeper create Znode

hi,I recently encountered a problem. We use zookeeper to manage the status
of some user jobs. For example, when a user job is converted from NEW to
RUNNING state, we will update its `RUNNING` state to the / job / states / $
{jobId} node. At the same time, we will also use PathChildrenCache to
monitor the directory. We correspond to multiple zookeeper clusters to
monitor the status of different clusters. The initialization code is as
follows:

for(Map.Entry<String, String> zkEntry : zkStringMap.entrySet()) {
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(...)
.sessionTimeoutMs(60000)
.connectionTimeoutMs(15000)
.retryPolicy(new ExponentialBackoffRetry(5000, 3))
.namespace("job/states")
.build();
curator.start();

PathChildrenCache cache = new PathChildrenCache(curator, "/", true);

cache.getListenable().addListener(new JobStateListener(zkEntry.getKey(),
stateHandler));

cache.start();
}




if  (node not exists) {

jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath,
newJobStatus.toString().getBytes());
} else {
     jobStateCurator.setData().forPath(jobPath,
newJobStatus.toString().getBytes(charset));
}

if(node

} else {
String finalJobPath = "/final-" + jobId.toString();
LOG.info("Job switch state to final state {}", newJobStatus.toString());

jobStateCurator.create().forPath(finalJobPath,
newJobStatus.toString().getBytes(charset));
int retry = 0;
Stat stat;
while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null)
{
LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times "
+ retry++);
Thread.sleep(50);
}

LOG.info("Success create zk Node in final state for " + jobId.toString() );
jobStateCurator.close();
}

if (!newJobStatus.isGloballyTerminalState()) {

LOG.info("Job switch state to {}", newJobStatus.toString());
if (jobStateCurator.checkExists().forPath(jobPath) == null) {
jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath,
newJobStatus.toString().getBytes());
} else {
jobStateCurator.setData().forPath(jobPath,
newJobStatus.toString().getBytes(charset));
}
} else {
String finalJobPath = "/final-" + jobId.toString();
LOG.info("Job switch state to final state {}", newJobStatus.toString());

jobStateCurator.create().forPath(finalJobPath,
newJobStatus.toString().getBytes(charset));
int retry = 0;
Stat stat;
while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null)
{
LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times "
+ retry++);
Thread.sleep(50);
}

LOG.info("Success create zk Node in final state for " + jobId.toString() );
jobStateCurator.close();
}

Re: zookeeper create Znode

Posted by Cee Tee <c....@gmail.com>.
Did we miss the question?
Regards
Chris

On 14 May 2020 18:11:18 dong ma <ma...@gmail.com> wrote:

> hi,I recently encountered a problem. We use zookeeper to manage the status
> of some user jobs. For example, when a user job is converted from NEW to
> RUNNING state, we will update its `RUNNING` state to the / job / states / $
> {jobId} node. At the same time, we will also use PathChildrenCache to
> monitor the directory. We correspond to multiple zookeeper clusters to
> monitor the status of different clusters. The initialization code is as
> follows:
>
> for(Map.Entry<String, String> zkEntry : zkStringMap.entrySet()) {
> CuratorFramework curator = CuratorFrameworkFactory.builder()
> .connectString(...)
> .sessionTimeoutMs(60000)
> .connectionTimeoutMs(15000)
> .retryPolicy(new ExponentialBackoffRetry(5000, 3))
> .namespace("job/states")
> .build();
> curator.start();
>
> PathChildrenCache cache = new PathChildrenCache(curator, "/", true);
>
> cache.getListenable().addListener(new JobStateListener(zkEntry.getKey(),
> stateHandler));
>
> cache.start();
> }
>
>
>
>
> if  (node not exists) {
>
> jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath,
> newJobStatus.toString().getBytes());
> } else {
>     jobStateCurator.setData().forPath(jobPath,
> newJobStatus.toString().getBytes(charset));
> }
>
> if(node
>
> } else {
> String finalJobPath = "/final-" + jobId.toString();
> LOG.info("Job switch state to final state {}", newJobStatus.toString());
>
> jobStateCurator.create().forPath(finalJobPath,
> newJobStatus.toString().getBytes(charset));
> int retry = 0;
> Stat stat;
> while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null)
> {
> LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times "
> + retry++);
> Thread.sleep(50);
> }
>
> LOG.info("Success create zk Node in final state for " + jobId.toString() );
> jobStateCurator.close();
> }
>
> if (!newJobStatus.isGloballyTerminalState()) {
>
> LOG.info("Job switch state to {}", newJobStatus.toString());
> if (jobStateCurator.checkExists().forPath(jobPath) == null) {
> jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath,
> newJobStatus.toString().getBytes());
> } else {
> jobStateCurator.setData().forPath(jobPath,
> newJobStatus.toString().getBytes(charset));
> }
> } else {
> String finalJobPath = "/final-" + jobId.toString();
> LOG.info("Job switch state to final state {}", newJobStatus.toString());
>
> jobStateCurator.create().forPath(finalJobPath,
> newJobStatus.toString().getBytes(charset));
> int retry = 0;
> Stat stat;
> while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null)
> {
> LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times "
> + retry++);
> Thread.sleep(50);
> }
>
> LOG.info("Success create zk Node in final state for " + jobId.toString() );
> jobStateCurator.close();
> }




Re: zookeeper create Znode

Posted by Jordan Zimmerman <jo...@jordanzimmerman.com>.
> encountered a problem

You haven't said what the problem is. Also, this is probably more appropriate for user@curator.apache.org

-Jordan

> On May 14, 2020, at 5:06 AM, dong ma <ma...@gmail.com> wrote:
> 
> hi,I recently encountered a problem. We use zookeeper to manage the status
> of some user jobs. For example, when a user job is converted from NEW to
> RUNNING state, we will update its `RUNNING` state to the / job / states / $
> {jobId} node. At the same time, we will also use PathChildrenCache to
> monitor the directory. We correspond to multiple zookeeper clusters to
> monitor the status of different clusters. The initialization code is as
> follows:
> 
> for(Map.Entry<String, String> zkEntry : zkStringMap.entrySet()) {
> CuratorFramework curator = CuratorFrameworkFactory.builder()
> .connectString(...)
> .sessionTimeoutMs(60000)
> .connectionTimeoutMs(15000)
> .retryPolicy(new ExponentialBackoffRetry(5000, 3))
> .namespace("job/states")
> .build();
> curator.start();
> 
> PathChildrenCache cache = new PathChildrenCache(curator, "/", true);
> 
> cache.getListenable().addListener(new JobStateListener(zkEntry.getKey(),
> stateHandler));
> 
> cache.start();
> }
> 
> 
> 
> 
> if  (node not exists) {
> 
> jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath,
> newJobStatus.toString().getBytes());
> } else {
>     jobStateCurator.setData().forPath(jobPath,
> newJobStatus.toString().getBytes(charset));
> }
> 
> if(node
> 
> } else {
> String finalJobPath = "/final-" + jobId.toString();
> LOG.info("Job switch state to final state {}", newJobStatus.toString());
> 
> jobStateCurator.create().forPath(finalJobPath,
> newJobStatus.toString().getBytes(charset));
> int retry = 0;
> Stat stat;
> while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null)
> {
> LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times "
> + retry++);
> Thread.sleep(50);
> }
> 
> LOG.info("Success create zk Node in final state for " + jobId.toString() );
> jobStateCurator.close();
> }
> 
> if (!newJobStatus.isGloballyTerminalState()) {
> 
> LOG.info("Job switch state to {}", newJobStatus.toString());
> if (jobStateCurator.checkExists().forPath(jobPath) == null) {
> jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath,
> newJobStatus.toString().getBytes());
> } else {
> jobStateCurator.setData().forPath(jobPath,
> newJobStatus.toString().getBytes(charset));
> }
> } else {
> String finalJobPath = "/final-" + jobId.toString();
> LOG.info("Job switch state to final state {}", newJobStatus.toString());
> 
> jobStateCurator.create().forPath(finalJobPath,
> newJobStatus.toString().getBytes(charset));
> int retry = 0;
> Stat stat;
> while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null)
> {
> LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times "
> + retry++);
> Thread.sleep(50);
> }
> 
> LOG.info("Success create zk Node in final state for " + jobId.toString() );
> jobStateCurator.close();
> }