You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2016/01/14 10:48:29 UTC

svn commit: r1724565 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/ main/java/org/apache/jackrabbit/oak/plugins/document/persistentCac...

Author: thomasm
Date: Thu Jan 14 09:48:29 2016
New Revision: 1724565

URL: http://svn.apache.org/viewvc?rev=1724565&view=rev
Log:
OAK-3727 Broadcasting cache: auto-configuration

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBroadcastConfig.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/DynamicBroadcastConfig.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/Broadcaster.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/InMemoryBroadcaster.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Thu Jan 14 09:48:29 2016
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.UUID;
 
 import org.apache.jackrabbit.oak.commons.StringUtils;
@@ -89,6 +91,16 @@ public class ClusterNodeInfo {
      * @see org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState
      */
     public static final String STATE = "state";
+    
+    /**
+     * The broadcast id. If the broadcasting cache is used, a new id is set after startup.
+     */
+    public static final String BROADCAST_ID = "broadcastId";
+
+    /**
+     * The broadcast listener (host:port). If the broadcasting cache is used, this is set after startup.
+     */
+    public static final String BROADCAST_LISTENER = "broadcastListener";
 
     public static enum ClusterNodeState {
         NONE,
@@ -776,6 +788,22 @@ public class ClusterNodeInfo {
         renewed = true;
         return true;
     }
+    
+    /**
+     * Update the cluster node info.
+     * 
+     * @param info the map of changes
+     */
+    public void setInfo(Map<String, String> info) {
+        // synchronized, because renewLease is also synchronized 
+        synchronized(this) {
+            UpdateOp update = new UpdateOp("" + id, false);
+            for(Entry<String, String> e : info.entrySet()) {
+                update.set(e.getKey(), e.getValue());
+            }
+            store.findAndUpdate(Collection.CLUSTER_NODES, update);
+        }
+    }
 
     /** for testing purpose only, not to be changed at runtime! */
     void setLeaseTime(long leaseTime) {

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBroadcastConfig.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBroadcastConfig.java?rev=1724565&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBroadcastConfig.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBroadcastConfig.java Thu Jan 14 09:48:29 2016
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
+
+public class DocumentBroadcastConfig implements DynamicBroadcastConfig {
+    
+    private final DocumentNodeStore documentNodeStore;
+
+    public DocumentBroadcastConfig(DocumentNodeStore documentNodeStore) {
+        this.documentNodeStore = documentNodeStore;
+    }
+
+    @Override
+    public String getConfig() {
+        // currently not implemented
+        return null;
+    }
+
+    @Override
+    public List<Map<String, String>> getClientInfo() {
+        ArrayList<Map<String, String>> list = new ArrayList<Map<String, String>>();
+        for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(documentNodeStore.getDocumentStore())) {
+            if (!doc.isActive()) {
+                continue;
+            }
+            Object broadcastId = doc.get(DynamicBroadcastConfig.ID);
+            Object listener = doc.get(DynamicBroadcastConfig.LISTENER);
+            if (broadcastId == null || listener == null) {
+                // no id or no listener
+                continue;
+            }
+            Map<String, String> map = new HashMap<String, String>();
+            map.put(DynamicBroadcastConfig.ID, broadcastId.toString());
+            map.put(DynamicBroadcastConfig.LISTENER, listener.toString());
+            list.add(map);
+        }
+        return list;
+    }
+
+    @Override
+    public String connect(Map<String, String> clientInfo) {
+        ClusterNodeInfo info = documentNodeStore.getClusterInfo();
+        info.setInfo(clientInfo);
+        return "" + info.getId();
+    }
+
+    @Override
+    public void disconnect(String id) {
+        // ignore
+    }
+
+}

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Thu Jan 14 09:48:29 2016
@@ -995,7 +995,7 @@ public class DocumentMK {
             return cache;
         }
         
-        private PersistentCache getPersistentCache() {
+        public PersistentCache getPersistentCache() {
             if (persistentCacheURI == null) {
                 return null;
             }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Thu Jan 14 09:48:29 2016
@@ -83,6 +83,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob;
 import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.commons.json.JsopStream;
 import org.apache.jackrabbit.oak.commons.json.JsopWriter;
@@ -502,6 +503,12 @@ public final class DocumentNodeStore
         // on a very busy machine - so as to prevent lease timeout.
         leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
         leaseUpdateThread.start();
+        
+        PersistentCache pc = builder.getPersistentCache();
+        if (pc != null) {
+            DynamicBroadcastConfig broadcastConfig = new DocumentBroadcastConfig(this);
+            pc.setBroadcastConfig(broadcastConfig);
+        }
 
         this.mbean = createMBean();
         LOG.info("Initialized DocumentNodeStore with clusterNodeId: {} ({})", clusterId,

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java Thu Jan 14 09:48:29 2016
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
@@ -79,6 +80,7 @@ public class PersistentCache implements
     private Broadcaster broadcaster;
     private ThreadLocal<WriteBuffer> writeBuffer = new ThreadLocal<WriteBuffer>();
     private final byte[] broadcastId;
+    private DynamicBroadcastConfig broadcastConfig;
     
     {
         ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
@@ -94,7 +96,7 @@ public class PersistentCache implements
         LOG.info("start, url={}", url);
         String[] parts = url.split(",");
         String dir = parts[0];
-        String broadcast = null;
+        String broadcast = "tcp:";
         for (String p : parts) {
             if (p.equals("+docs")) {
                 cacheDocs = true;
@@ -193,7 +195,9 @@ public class PersistentCache implements
         if (broadcast == null) {
             return;
         }
-        if (broadcast.equals("inMemory")) {
+        if (broadcast.equals("disabled")) {
+            return;
+        } else if (broadcast.equals("inMemory")) {
             broadcaster = InMemoryBroadcaster.INSTANCE;
         } else if (broadcast.startsWith("udp:")) {
             String config = broadcast.substring("udp:".length(), broadcast.length());
@@ -513,6 +517,17 @@ public class PersistentCache implements
         }
         cache.receive(buff);
     }
+    
+    public DynamicBroadcastConfig getBroadcastConfig() {
+        return broadcastConfig;
+    }
+
+    public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
+        this.broadcastConfig = broadcastConfig;
+        if (broadcaster != null) {
+            broadcaster.setBroadcastConfig(broadcastConfig);
+        }
+    }
 
     interface GenerationCache {
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/Broadcaster.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/Broadcaster.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/Broadcaster.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/Broadcaster.java Thu Jan 14 09:48:29 2016
@@ -24,6 +24,13 @@ import java.nio.ByteBuffer;
 public interface Broadcaster {
     
     /**
+     * Change the dynamic broadcasting configuration.
+     * 
+     * @param broadcastConfig the new configuration
+     */
+    void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig);
+    
+    /**
      * Send a message.
      * 
      * @param buff the buffer

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/DynamicBroadcastConfig.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/DynamicBroadcastConfig.java?rev=1724565&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/DynamicBroadcastConfig.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/DynamicBroadcastConfig.java Thu Jan 14 09:48:29 2016
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Broadcast configuration. Configuration is dynamic, that means can change over
+ * time. The configuration consists of a list of connected clients. Each client
+ * can connect and disconnect, and therefore allow other clients to connect to
+ * it.
+ */
+public interface DynamicBroadcastConfig {
+    
+    /**
+     * The unique id of this client.
+     */
+    String ID = "broadcastId";
+    
+    /**
+     * The listener address, for example the IP address and port.
+     */
+    String LISTENER = "broadcastListener";
+    
+    /**
+     * Get the global configuration data that is not associated to a specific client.
+     * 
+     * @return the configuration
+     */
+    String getConfig();
+    
+    /**
+     * Get the client info of all connected clients.
+     * 
+     * @return the list of client info maps
+     */
+    List<Map<String, String>> getClientInfo();
+    
+    /**
+     * Announce a new client to others.
+     * 
+     * @param clientInfo the client info
+     * @return a unique id (to be used when disconnecting)
+     */
+    String connect(Map<String, String> clientInfo);
+    
+    /**
+     * Sign off.
+     * 
+     * @param id the unique id
+     */
+    void disconnect(String id);
+
+}

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/InMemoryBroadcaster.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/InMemoryBroadcaster.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/InMemoryBroadcaster.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/InMemoryBroadcaster.java Thu Jan 14 09:48:29 2016
@@ -51,5 +51,10 @@ public class InMemoryBroadcaster impleme
     public void close() {
         // ignore
     }
+    
+    @Override
+    public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
+        // not yet implemented
+    }
 
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java Thu Jan 14 09:48:29 2016
@@ -27,8 +27,14 @@ import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.security.MessageDigest;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
@@ -46,27 +52,44 @@ public class TCPBroadcaster implements B
     private static final int TIMEOUT = 100;
     private static final int MAX_BUFFER_SIZE = 64;
     private static final AtomicInteger NEXT_ID = new AtomicInteger();
+    private static final Charset UTF8 = Charset.forName("UTF-8");
 
-    private final byte[] key;
     private final int id = NEXT_ID.incrementAndGet();
+
     private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
-    private final ServerSocket serverSocket;
-    private final ArrayList<Client> clients = new ArrayList<Client>();
-    private final Thread discoverThread;
-    private final Thread acceptThread;
-    private final Thread sendThread;
-    private final LinkedBlockingDeque<ByteBuffer> sendBuffer = new LinkedBlockingDeque<ByteBuffer>();
+    private final ConcurrentHashMap<String, Client> clients = new ConcurrentHashMap<String, Client>();
+    private final ArrayBlockingQueue<ByteBuffer> sendBuffer = new ArrayBlockingQueue<ByteBuffer>(MAX_BUFFER_SIZE * 2);
+
+    private volatile DynamicBroadcastConfig broadcastConfig;
+    private ServerSocket serverSocket;
+    private Thread acceptThread;
+    private Thread discoverThread;
+    private Thread sendThread;
+    private String ownListener;
+    private String ownKeyUUID = UUID.randomUUID().toString();
+    private byte[] ownKey = ownKeyUUID.getBytes(UTF8);
+    
     private volatile boolean stop;
     
     public TCPBroadcaster(String config) {
         LOG.info("Init " + config);
-        MessageDigest messageDigest;
+        init(config);
+    }
+    
+    public void init(String config) {
         try {
             String[] parts = config.split(";");
             int startPort = 9800;
             int endPort = 9810;
             String key = "";
-            String[] sendTo = {"sendTo", "localhost"};
+            
+            // for debugging, this will send everything to localhost:
+            // String[] sendTo = {"sendTo", "localhost"};
+            
+            // by default, only the entries in the clusterNodes 
+            // collection are used:
+            String[] sendTo = {"sendTo"};
+            
             for (String p : parts) {
                 if (p.startsWith("ports ")) {
                     String[] ports = p.split(" ");
@@ -79,8 +102,10 @@ public class TCPBroadcaster implements B
                 }
             }                    
             sendTo[0] = null;
-            messageDigest = MessageDigest.getInstance("SHA-256");
-            this.key = messageDigest.digest(key.getBytes("UTF-8"));
+            MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
+            if (key.length() > 0) {
+                ownKey = messageDigest.digest(key.getBytes(UTF8));
+            }
             IOException lastException = null;
             ServerSocket server = null;
             for (int port = startPort; port <= endPort; port++) {
@@ -96,8 +121,8 @@ public class TCPBroadcaster implements B
                 for (String send : sendTo) {
                     if (send != null && !send.isEmpty()) {
                         try {
-                            Client c = new Client(send, port);
-                            clients.add(c);
+                            Client c = new Client(send, port, ownKey);
+                            clients.put(send + ":" + port, c);
                         } catch (IOException e) {
                             LOG.debug("Cannot connect to " + send + " " + port);
                             // ignore
@@ -144,6 +169,37 @@ public class TCPBroadcaster implements B
         
     }
     
+    @Override
+    public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
+        this.broadcastConfig = broadcastConfig;
+        HashMap<String, String> clientInfo = new HashMap<String, String>();
+        clientInfo.put(DynamicBroadcastConfig.ID, ownKeyUUID);
+        ServerSocket s = serverSocket;
+        if (s != null) {
+            String address = getLocalAddress();
+            if (address != null) {
+                ownListener = address + ":" + s.getLocalPort();
+                clientInfo.put(DynamicBroadcastConfig.LISTENER, ownListener);
+            }
+        }
+        broadcastConfig.connect(clientInfo);
+    }
+    
+    static String getLocalAddress() {
+        String bind = System.getProperty("oak.tcpBindAddress", null);
+        try {
+            InetAddress address;
+            if (bind != null && !bind.isEmpty()) {
+                address = InetAddress.getByName(bind);
+            } else {
+                address = InetAddress.getLocalHost();
+            }
+            return address.getHostAddress();
+        } catch (UnknownHostException e) {
+            return "";
+        }
+    }
+    
     void accept() {
         while (!stop) {
             try {
@@ -153,9 +209,9 @@ public class TCPBroadcaster implements B
                     public void run() {
                         try {
                             final DataInputStream in = new DataInputStream(socket.getInputStream());
-                            byte[] testKey = new byte[key.length];
+                            byte[] testKey = new byte[ownKey.length];
                             in.readFully(testKey);
-                            if (ByteBuffer.wrap(testKey).compareTo(ByteBuffer.wrap(key)) != 0) {
+                            if (ByteBuffer.wrap(testKey).compareTo(ByteBuffer.wrap(ownKey)) != 0) {
                                 LOG.debug("Key mismatch");
                                 socket.close();
                                 return;
@@ -172,7 +228,6 @@ public class TCPBroadcaster implements B
                                 }
                             }
                         } catch (IOException e) {
-e.printStackTrace();                            
                             // ignore
                         }
                     }
@@ -199,19 +254,59 @@ e.printStackTrace();
     
     void discover() {
         while (!stop) {
-            for (Client c : clients) {
-                c.tryConnect(key);
+            DynamicBroadcastConfig b = broadcastConfig;
+            if (b != null) {
+                readClients(b);
+            }
+            for (Client c : clients.values()) {
+                c.tryConnect();
                 if (stop) {
                     break;
                 }
             }
+            try {
+                Thread.sleep(2000);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+    
+    void readClients(DynamicBroadcastConfig b) {
+        List<Map<String, String>> list = b.getClientInfo();
+        for(Map<String, String> m : list) {
+            String listener = m.get(DynamicBroadcastConfig.LISTENER);
+            String id = m.get(DynamicBroadcastConfig.ID);
+            if (listener.equals(ownListener)) {
+                continue;
+            }
+            // the key is the combination of listener and id,
+            // because the same ip address / port combination
+            // could be there multiple time for some time
+            // (in case there is a old, orphan entry for the same machine)
+            String clientKey = listener + " " + id;
+            Client c = clients.get(clientKey);
+            if (c == null) {
+                int index = listener.lastIndexOf(':');
+                if (index >= 0) {
+                    String host = listener.substring(0, index);
+                    int port = Integer.parseInt(listener.substring(index + 1));
+                    try {
+                        byte[] key = id.getBytes(UTF8);
+                        c = new Client(host, port, key);
+                        clients.put(clientKey, c);
+                    } catch (UnknownHostException e) {
+                        // ignore
+                    }
+                }
+            }
         }
     }
     
     void send() {
         while (!stop) {
             try {
-                ByteBuffer buff = sendBuffer.pollLast(10, TimeUnit.MILLISECONDS);
+                ByteBuffer buff = sendBuffer.poll(10, TimeUnit.MILLISECONDS);
                 if (buff != null && !stop) {
                     sendBuffer(buff);
                 }
@@ -227,16 +322,22 @@ e.printStackTrace();
         b.put(buff);
         b.flip();
         while (sendBuffer.size() > MAX_BUFFER_SIZE) {
-            sendBuffer.pollLast();
+            sendBuffer.poll();
+        }
+        try {
+            sendBuffer.add(b);
+        } catch (IllegalStateException e) {
+            // ignore - might happen once in a while,
+            // if the buffer was not yet full just before, but now
+            // many threads concurrently tried to add
         }
-        sendBuffer.push(b);
     }
     
     private void sendBuffer(ByteBuffer buff) {
         int len = buff.limit();
         byte[] data = new byte[len];
         buff.get(data);
-        for (Client c : clients) {
+        for (Client c : clients.values()) {
             c.send(data);
             if (stop) {
                 break;
@@ -287,12 +388,14 @@ e.printStackTrace();
     }
     
     static class Client {
-        final InetAddress address;
+        final String host;
         final int port;
+        final byte[] key;
         DataOutputStream out;
-        Client(String name, int port) throws UnknownHostException {
-            this.address = InetAddress.getByName(name);
+        Client(String host, int port, byte[] key) throws UnknownHostException {
+            this.host = host;
             this.port = port;
+            this.key = key;
         }
         void send(byte[] data) {
             DataOutputStream o = out;
@@ -314,9 +417,15 @@ e.printStackTrace();
                 }
             }
         }
-        void tryConnect(byte[] key) {
+        void tryConnect() {
             DataOutputStream o = out;
-            if (o != null || address == null) {
+            if (o != null || host == null) {
+                return;
+            }
+            InetAddress address;
+            try {
+                address = InetAddress.getByName(host);
+            } catch (UnknownHostException e1) {
                 return;
             }
             Socket socket = new Socket();

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java Thu Jan 14 09:48:29 2016
@@ -251,4 +251,9 @@ public class UDPBroadcaster implements B
         return !stop;
     }
 
+    @Override
+    public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
+        // not yet implemented
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java Thu Jan 14 09:48:29 2016
@@ -143,7 +143,7 @@ public class BroadcastTest {
     
     @Test
     public void broadcastTCP() throws Exception {
-        broadcast("tcp:key 123", 90);
+        broadcast("tcp:sendTo localhost;key 123", 90);
     }
 
     @Test