You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2016/01/14 10:48:29 UTC
svn commit: r1724565 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/
main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/
main/java/org/apache/jackrabbit/oak/plugins/document/persistentCac...
Author: thomasm
Date: Thu Jan 14 09:48:29 2016
New Revision: 1724565
URL: http://svn.apache.org/viewvc?rev=1724565&view=rev
Log:
OAK-3727 Broadcasting cache: auto-configuration
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBroadcastConfig.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/DynamicBroadcastConfig.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/Broadcaster.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/InMemoryBroadcaster.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Thu Jan 14 09:48:29 2016
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.UUID;
import org.apache.jackrabbit.oak.commons.StringUtils;
@@ -89,6 +91,16 @@ public class ClusterNodeInfo {
* @see org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState
*/
public static final String STATE = "state";
+
+ /**
+ * The broadcast id. If the broadcasting cache is used, a new id is set after startup.
+ */
+ public static final String BROADCAST_ID = "broadcastId";
+
+ /**
+ * The broadcast listener (host:port). If the broadcasting cache is used, this is set after startup.
+ */
+ public static final String BROADCAST_LISTENER = "broadcastListener";
public static enum ClusterNodeState {
NONE,
@@ -776,6 +788,22 @@ public class ClusterNodeInfo {
renewed = true;
return true;
}
+
+ /**
+ * Update the cluster node info.
+ *
+ * @param info the map of changes
+ */
+ public void setInfo(Map<String, String> info) {
+ // synchronized, because renewLease is also synchronized
+ synchronized(this) {
+ UpdateOp update = new UpdateOp("" + id, false);
+ for(Entry<String, String> e : info.entrySet()) {
+ update.set(e.getKey(), e.getValue());
+ }
+ store.findAndUpdate(Collection.CLUSTER_NODES, update);
+ }
+ }
/** for testing purpose only, not to be changed at runtime! */
void setLeaseTime(long leaseTime) {
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBroadcastConfig.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBroadcastConfig.java?rev=1724565&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBroadcastConfig.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBroadcastConfig.java Thu Jan 14 09:48:29 2016
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
+
+public class DocumentBroadcastConfig implements DynamicBroadcastConfig {
+
+ private final DocumentNodeStore documentNodeStore;
+
+ public DocumentBroadcastConfig(DocumentNodeStore documentNodeStore) {
+ this.documentNodeStore = documentNodeStore;
+ }
+
+ @Override
+ public String getConfig() {
+ // currently not implemented
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> getClientInfo() {
+ ArrayList<Map<String, String>> list = new ArrayList<Map<String, String>>();
+ for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(documentNodeStore.getDocumentStore())) {
+ if (!doc.isActive()) {
+ continue;
+ }
+ Object broadcastId = doc.get(DynamicBroadcastConfig.ID);
+ Object listener = doc.get(DynamicBroadcastConfig.LISTENER);
+ if (broadcastId == null || listener == null) {
+ // no id or no listener
+ continue;
+ }
+ Map<String, String> map = new HashMap<String, String>();
+ map.put(DynamicBroadcastConfig.ID, broadcastId.toString());
+ map.put(DynamicBroadcastConfig.LISTENER, listener.toString());
+ list.add(map);
+ }
+ return list;
+ }
+
+ @Override
+ public String connect(Map<String, String> clientInfo) {
+ ClusterNodeInfo info = documentNodeStore.getClusterInfo();
+ info.setInfo(clientInfo);
+ return "" + info.getId();
+ }
+
+ @Override
+ public void disconnect(String id) {
+ // ignore
+ }
+
+}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Thu Jan 14 09:48:29 2016
@@ -995,7 +995,7 @@ public class DocumentMK {
return cache;
}
- private PersistentCache getPersistentCache() {
+ public PersistentCache getPersistentCache() {
if (persistentCacheURI == null) {
return null;
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Thu Jan 14 09:48:29 2016
@@ -83,6 +83,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob;
import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.commons.json.JsopStream;
import org.apache.jackrabbit.oak.commons.json.JsopWriter;
@@ -502,6 +503,12 @@ public final class DocumentNodeStore
// on a very busy machine - so as to prevent lease timeout.
leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
leaseUpdateThread.start();
+
+ PersistentCache pc = builder.getPersistentCache();
+ if (pc != null) {
+ DynamicBroadcastConfig broadcastConfig = new DocumentBroadcastConfig(this);
+ pc.setBroadcastConfig(broadcastConfig);
+ }
this.mbean = createMBean();
LOG.info("Initialized DocumentNodeStore with clusterNodeId: {} ({})", clusterId,
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java Thu Jan 14 09:48:29 2016
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
@@ -79,6 +80,7 @@ public class PersistentCache implements
private Broadcaster broadcaster;
private ThreadLocal<WriteBuffer> writeBuffer = new ThreadLocal<WriteBuffer>();
private final byte[] broadcastId;
+ private DynamicBroadcastConfig broadcastConfig;
{
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
@@ -94,7 +96,7 @@ public class PersistentCache implements
LOG.info("start, url={}", url);
String[] parts = url.split(",");
String dir = parts[0];
- String broadcast = null;
+ String broadcast = "tcp:";
for (String p : parts) {
if (p.equals("+docs")) {
cacheDocs = true;
@@ -193,7 +195,9 @@ public class PersistentCache implements
if (broadcast == null) {
return;
}
- if (broadcast.equals("inMemory")) {
+ if (broadcast.equals("disabled")) {
+ return;
+ } else if (broadcast.equals("inMemory")) {
broadcaster = InMemoryBroadcaster.INSTANCE;
} else if (broadcast.startsWith("udp:")) {
String config = broadcast.substring("udp:".length(), broadcast.length());
@@ -513,6 +517,17 @@ public class PersistentCache implements
}
cache.receive(buff);
}
+
+ public DynamicBroadcastConfig getBroadcastConfig() {
+ return broadcastConfig;
+ }
+
+ public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
+ this.broadcastConfig = broadcastConfig;
+ if (broadcaster != null) {
+ broadcaster.setBroadcastConfig(broadcastConfig);
+ }
+ }
interface GenerationCache {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/Broadcaster.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/Broadcaster.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/Broadcaster.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/Broadcaster.java Thu Jan 14 09:48:29 2016
@@ -24,6 +24,13 @@ import java.nio.ByteBuffer;
public interface Broadcaster {
/**
+ * Change the dynamic broadcasting configuration.
+ *
+ * @param broadcastConfig the new configuration
+ */
+ void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig);
+
+ /**
* Send a message.
*
* @param buff the buffer
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/DynamicBroadcastConfig.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/DynamicBroadcastConfig.java?rev=1724565&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/DynamicBroadcastConfig.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/DynamicBroadcastConfig.java Thu Jan 14 09:48:29 2016
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Broadcast configuration. Configuration is dynamic, that means can change over
+ * time. The configuration consists of a list of connected clients. Each client
+ * can connect and disconnect, and therefore allow other clients to connect to
+ * it.
+ */
+public interface DynamicBroadcastConfig {
+
+ /**
+ * The unique id of this client.
+ */
+ String ID = "broadcastId";
+
+ /**
+ * The listener address, for example the IP address and port.
+ */
+ String LISTENER = "broadcastListener";
+
+ /**
+ * Get the global configuration data that is not associated to a specific client.
+ *
+ * @return the configuration
+ */
+ String getConfig();
+
+ /**
+ * Get the client info of all connected clients.
+ *
+ * @return the list of client info maps
+ */
+ List<Map<String, String>> getClientInfo();
+
+ /**
+ * Announce a new client to others.
+ *
+ * @param clientInfo the client info
+ * @return a unique id (to be used when disconnecting)
+ */
+ String connect(Map<String, String> clientInfo);
+
+ /**
+ * Sign off.
+ *
+ * @param id the unique id
+ */
+ void disconnect(String id);
+
+}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/InMemoryBroadcaster.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/InMemoryBroadcaster.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/InMemoryBroadcaster.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/InMemoryBroadcaster.java Thu Jan 14 09:48:29 2016
@@ -51,5 +51,10 @@ public class InMemoryBroadcaster impleme
public void close() {
// ignore
}
+
+ @Override
+ public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
+ // not yet implemented
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java Thu Jan 14 09:48:29 2016
@@ -27,8 +27,14 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.security.MessageDigest;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
@@ -46,27 +52,44 @@ public class TCPBroadcaster implements B
private static final int TIMEOUT = 100;
private static final int MAX_BUFFER_SIZE = 64;
private static final AtomicInteger NEXT_ID = new AtomicInteger();
+ private static final Charset UTF8 = Charset.forName("UTF-8");
- private final byte[] key;
private final int id = NEXT_ID.incrementAndGet();
+
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
- private final ServerSocket serverSocket;
- private final ArrayList<Client> clients = new ArrayList<Client>();
- private final Thread discoverThread;
- private final Thread acceptThread;
- private final Thread sendThread;
- private final LinkedBlockingDeque<ByteBuffer> sendBuffer = new LinkedBlockingDeque<ByteBuffer>();
+ private final ConcurrentHashMap<String, Client> clients = new ConcurrentHashMap<String, Client>();
+ private final ArrayBlockingQueue<ByteBuffer> sendBuffer = new ArrayBlockingQueue<ByteBuffer>(MAX_BUFFER_SIZE * 2);
+
+ private volatile DynamicBroadcastConfig broadcastConfig;
+ private ServerSocket serverSocket;
+ private Thread acceptThread;
+ private Thread discoverThread;
+ private Thread sendThread;
+ private String ownListener;
+ private String ownKeyUUID = UUID.randomUUID().toString();
+ private byte[] ownKey = ownKeyUUID.getBytes(UTF8);
+
private volatile boolean stop;
public TCPBroadcaster(String config) {
LOG.info("Init " + config);
- MessageDigest messageDigest;
+ init(config);
+ }
+
+ public void init(String config) {
try {
String[] parts = config.split(";");
int startPort = 9800;
int endPort = 9810;
String key = "";
- String[] sendTo = {"sendTo", "localhost"};
+
+ // for debugging, this will send everything to localhost:
+ // String[] sendTo = {"sendTo", "localhost"};
+
+ // by default, only the entries in the clusterNodes
+ // collection are used:
+ String[] sendTo = {"sendTo"};
+
for (String p : parts) {
if (p.startsWith("ports ")) {
String[] ports = p.split(" ");
@@ -79,8 +102,10 @@ public class TCPBroadcaster implements B
}
}
sendTo[0] = null;
- messageDigest = MessageDigest.getInstance("SHA-256");
- this.key = messageDigest.digest(key.getBytes("UTF-8"));
+ MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
+ if (key.length() > 0) {
+ ownKey = messageDigest.digest(key.getBytes(UTF8));
+ }
IOException lastException = null;
ServerSocket server = null;
for (int port = startPort; port <= endPort; port++) {
@@ -96,8 +121,8 @@ public class TCPBroadcaster implements B
for (String send : sendTo) {
if (send != null && !send.isEmpty()) {
try {
- Client c = new Client(send, port);
- clients.add(c);
+ Client c = new Client(send, port, ownKey);
+ clients.put(send + ":" + port, c);
} catch (IOException e) {
LOG.debug("Cannot connect to " + send + " " + port);
// ignore
@@ -144,6 +169,37 @@ public class TCPBroadcaster implements B
}
+ @Override
+ public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
+ this.broadcastConfig = broadcastConfig;
+ HashMap<String, String> clientInfo = new HashMap<String, String>();
+ clientInfo.put(DynamicBroadcastConfig.ID, ownKeyUUID);
+ ServerSocket s = serverSocket;
+ if (s != null) {
+ String address = getLocalAddress();
+ if (address != null) {
+ ownListener = address + ":" + s.getLocalPort();
+ clientInfo.put(DynamicBroadcastConfig.LISTENER, ownListener);
+ }
+ }
+ broadcastConfig.connect(clientInfo);
+ }
+
+ static String getLocalAddress() {
+ String bind = System.getProperty("oak.tcpBindAddress", null);
+ try {
+ InetAddress address;
+ if (bind != null && !bind.isEmpty()) {
+ address = InetAddress.getByName(bind);
+ } else {
+ address = InetAddress.getLocalHost();
+ }
+ return address.getHostAddress();
+ } catch (UnknownHostException e) {
+ return "";
+ }
+ }
+
void accept() {
while (!stop) {
try {
@@ -153,9 +209,9 @@ public class TCPBroadcaster implements B
public void run() {
try {
final DataInputStream in = new DataInputStream(socket.getInputStream());
- byte[] testKey = new byte[key.length];
+ byte[] testKey = new byte[ownKey.length];
in.readFully(testKey);
- if (ByteBuffer.wrap(testKey).compareTo(ByteBuffer.wrap(key)) != 0) {
+ if (ByteBuffer.wrap(testKey).compareTo(ByteBuffer.wrap(ownKey)) != 0) {
LOG.debug("Key mismatch");
socket.close();
return;
@@ -172,7 +228,6 @@ public class TCPBroadcaster implements B
}
}
} catch (IOException e) {
-e.printStackTrace();
// ignore
}
}
@@ -199,19 +254,59 @@ e.printStackTrace();
void discover() {
while (!stop) {
- for (Client c : clients) {
- c.tryConnect(key);
+ DynamicBroadcastConfig b = broadcastConfig;
+ if (b != null) {
+ readClients(b);
+ }
+ for (Client c : clients.values()) {
+ c.tryConnect();
if (stop) {
break;
}
}
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ void readClients(DynamicBroadcastConfig b) {
+ List<Map<String, String>> list = b.getClientInfo();
+ for(Map<String, String> m : list) {
+ String listener = m.get(DynamicBroadcastConfig.LISTENER);
+ String id = m.get(DynamicBroadcastConfig.ID);
+ if (listener.equals(ownListener)) {
+ continue;
+ }
+ // the key is the combination of listener and id,
+ // because the same ip address / port combination
+ // could be there multiple time for some time
+ // (in case there is a old, orphan entry for the same machine)
+ String clientKey = listener + " " + id;
+ Client c = clients.get(clientKey);
+ if (c == null) {
+ int index = listener.lastIndexOf(':');
+ if (index >= 0) {
+ String host = listener.substring(0, index);
+ int port = Integer.parseInt(listener.substring(index + 1));
+ try {
+ byte[] key = id.getBytes(UTF8);
+ c = new Client(host, port, key);
+ clients.put(clientKey, c);
+ } catch (UnknownHostException e) {
+ // ignore
+ }
+ }
+ }
}
}
void send() {
while (!stop) {
try {
- ByteBuffer buff = sendBuffer.pollLast(10, TimeUnit.MILLISECONDS);
+ ByteBuffer buff = sendBuffer.poll(10, TimeUnit.MILLISECONDS);
if (buff != null && !stop) {
sendBuffer(buff);
}
@@ -227,16 +322,22 @@ e.printStackTrace();
b.put(buff);
b.flip();
while (sendBuffer.size() > MAX_BUFFER_SIZE) {
- sendBuffer.pollLast();
+ sendBuffer.poll();
+ }
+ try {
+ sendBuffer.add(b);
+ } catch (IllegalStateException e) {
+ // ignore - might happen once in a while,
+ // if the buffer was not yet full just before, but now
+ // many threads concurrently tried to add
}
- sendBuffer.push(b);
}
private void sendBuffer(ByteBuffer buff) {
int len = buff.limit();
byte[] data = new byte[len];
buff.get(data);
- for (Client c : clients) {
+ for (Client c : clients.values()) {
c.send(data);
if (stop) {
break;
@@ -287,12 +388,14 @@ e.printStackTrace();
}
static class Client {
- final InetAddress address;
+ final String host;
final int port;
+ final byte[] key;
DataOutputStream out;
- Client(String name, int port) throws UnknownHostException {
- this.address = InetAddress.getByName(name);
+ Client(String host, int port, byte[] key) throws UnknownHostException {
+ this.host = host;
this.port = port;
+ this.key = key;
}
void send(byte[] data) {
DataOutputStream o = out;
@@ -314,9 +417,15 @@ e.printStackTrace();
}
}
}
- void tryConnect(byte[] key) {
+ void tryConnect() {
DataOutputStream o = out;
- if (o != null || address == null) {
+ if (o != null || host == null) {
+ return;
+ }
+ InetAddress address;
+ try {
+ address = InetAddress.getByName(host);
+ } catch (UnknownHostException e1) {
return;
}
Socket socket = new Socket();
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java Thu Jan 14 09:48:29 2016
@@ -251,4 +251,9 @@ public class UDPBroadcaster implements B
return !stop;
}
+ @Override
+ public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
+ // not yet implemented
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java?rev=1724565&r1=1724564&r2=1724565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java Thu Jan 14 09:48:29 2016
@@ -143,7 +143,7 @@ public class BroadcastTest {
@Test
public void broadcastTCP() throws Exception {
- broadcast("tcp:key 123", 90);
+ broadcast("tcp:sendTo localhost;key 123", 90);
}
@Test