You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by 王 纯超 <wa...@outlook.com> on 2018/05/07 08:38:18 UTC

Watch Zookeeper in Spark Closure

Hi,

I want to listen to Zookeeper node change in Spark closure to change behavior dynamically. So I use Class PathChildrenCache provided by Curator. But the changes are not captured somehow.

main function in the closure
public class DpiScenarioBasedFilter implements FlatMapFunction<Iterator<DPIFlow>, Tuple2<String, String>> {
...
@Override
    public Iterator<Tuple2<String, String>> call(Iterator<DPIFlow> iter) throws Exception {
        if(!scenarioRuleManager.isRegistered()){ //scenarioRuleManager is an instance of ScenarioRuleManager class, which is responsible for listening Zookeeper
            synchronized(scenarioRuleManager){
                if(!scenarioRuleManager.isRegistered()){
                    ZkRuleManager.getInstance(zkConfig).register(scenarioRuleManager);
                    scenarioRuleManager.setRegistered(true);
                }
            }
        }
        List<Tuple2<String, String>> list = new LinkedList<Tuple2<String, String>>();
        Map<String, IRule> rules = scenarioRuleManager.getRules();

        while(iter.hasNext()){
            DPIFlow df = iter.next();
            Key k = df.getKey();
            String isdn = k.getIsdn();
            String protocol = k.getProtocol();

            list.add(new Tuple2<String, String>(rules.size() + "", isdn));
        }
        return list.iterator();
    }
...
}

Zookeeper listener
public class ScenarioRuleManager implements IRuleChangeListener, Serializable {

    private ConcurrentHashMap<String, IRule> rules;

    private volatile boolean registered = false;

    @Override
    public void onCreate(String key) { //declared in IRuleChangeListener
        rules.put(key, rule);
    }

    @Override
    public void onDelete(String key) {//declared in IRuleChangeListener
       rules.remove(key);
    }

    @Override
    public void onUpdate(String key) {//declared in IRuleChangeListener
        rules.put(key, rule);
    }
}

Zookeeper listener management
public class ZkRuleManager implements Closeable {
    private static volatile ZkRuleManager INSTANCE = null;
    private static Object _lock = new Object();

    private CuratorFramework _curator;
    private ZkConfig _zkConfig;

    private String zkRoot = "/rtm/keys";

    private ExecutorService es = Executors.newCachedThreadPool();
    private IRuleChangeListener listener;
    private PathChildrenCache childCache;

    private ZkRuleManager(ZkConfig zkConfig) {
        _zkConfig = zkConfig;
        _curator = ZkClientHolder.getZkClient(zkConfig);
        _curator.start();
        init();
        childCache = new PathChildrenCache(_curator, zkRoot, true);
        try{
            childCache.start(StartMode.BUILD_INITIAL_CACHE);
        }catch(Exception e){
            throw new RuntimeException(e);
        }
    }

    private void init() {
        try{
            createZkRoot();
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    public void register(final IRuleChangeListener listener) {
        childCache.getListenable().addListener(new PathChildrenCacheListener() {

            @Override
            public void childEvent(CuratorFramework cf, PathChildrenCacheEvent pcce) throws Exception {
                switch(pcce.getType()){
                    case CHILD_ADDED:
                        listener.onCreate(new String(pcce.getData().getData()));
                        break;
                    case CHILD_REMOVED:
                        listener.onDelete(new String(pcce.getData().getData()));
                        break;
                    case CHILD_UPDATED:
                        listener.onUpdate(new String(pcce.getData().getData()));
                        break;
                    default:
                        break;
                }
            }
        }, es);

    }

    private void createZkRoot() throws Exception {
        if(_curator.checkExists().forPath(zkRoot) == null){
            _curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkRoot);
        }
    }

    public static ZkRuleManager getInstance(ZkConfig zkConfig) {
        if(INSTANCE == null){
            synchronized(_lock){
                if(INSTANCE == null){
                    INSTANCE = new ZkRuleManager(zkConfig);
                }
            }
        }
        return INSTANCE;
    }

    public CuratorFramework getCurator() {
        return _curator;
    }

    public void writeRule(String ruleKey) throws Exception {
        String path = ZKPaths.makePath(zkRoot, ruleKey);
        if(_curator.checkExists().forPath(path) == null){//节点不存在
            _curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, ruleKey.getBytes());
        }else{
            _curator.setData().forPath(path, ruleKey.getBytes());
        }
    }

    @Override
    public void close() {
        try{
            childCache.close();
        }catch(IOException e){
            e.printStackTrace();
        }
        _curator.close();
    }
}
What's the problem?
I am not quite clear about Spark's mechanics. I know that the tasks(defined in DpiScenarioBasedFilter in this case) are executed on worker node after the instances of the class are deserialized. How many instances are there? One per task? Are they equal but not the same one for each JVM? If yes, do their same fields(scenarioRuleManager) refer to only one object or different ones?

Could anyone help me?
________________________________
wangchunchao@outlook.com