You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by rj...@apache.org on 2009/06/09 14:50:47 UTC
svn commit: r782980 [4/8] - in
/directory/sandbox/slp/src/main/java/org/apache/directory/slp: ./ codec/
extensions/ impl/ impl/da/ impl/filter/ messages/
Modified: directory/sandbox/slp/src/main/java/org/apache/directory/slp/impl/SLPCore.java
URL: http://svn.apache.org/viewvc/directory/sandbox/slp/src/main/java/org/apache/directory/slp/impl/SLPCore.java?rev=782980&r1=782979&r2=782980&view=diff
==============================================================================
--- directory/sandbox/slp/src/main/java/org/apache/directory/slp/impl/SLPCore.java (original)
+++ directory/sandbox/slp/src/main/java/org/apache/directory/slp/impl/SLPCore.java Tue Jun 9 12:50:45 2009
@@ -19,7 +19,6 @@
*/
package org.apache.directory.slp.impl;
-
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
@@ -62,8 +61,7 @@
/**
* the core class of the jSLP implementation.
- * <code>ch.ethz.iks.slp.ServiceLocationManager</code> inherits from this
- * class.
+ * <code>ch.ethz.iks.slp.ServiceLocationManager</code> inherits from this class.
*
* @see ch.ethz.iks.slp.impl.ServiceLocationManager
* @author Jan S. Rellermeyer
@@ -71,7 +69,8 @@
public class SLPCore {
private static volatile boolean isInitialized = false;
- private static volatile boolean isListening = false; // is the UA listening for adverts?
+ private static volatile boolean isListening = false; // is the UA listening
+ // for adverts?
private static volatile boolean isSAInitialized = false;
private static volatile boolean isDAInitialized = false;
@@ -90,7 +89,7 @@
/**
* the reserved (standard) port.
*/
- public static final int SLP_RESERVED_PORT = 427;
+ public static final int SLP_RESERVED_PORT = 427;
/**
* the standard SLP multicast address.
@@ -127,7 +126,7 @@
* configured to perform no DA discovery ?
*/
static final boolean noDiscovery;
- static String discoveryException="";
+ static String discoveryException = "";
/**
* the constructor for <code>Advertiser</code> instances, if an
@@ -136,8 +135,8 @@
protected static final Constructor advertiser;
/**
- * the constructor for <code>Locator</code> instances, if an
- * implementation exists.
+ * the constructor for <code>Locator</code> instances, if an implementation
+ * exists.
*/
protected static final Constructor locator;
@@ -153,7 +152,6 @@
*/
private static SLPDaemon daemon;
-
/**
* the constructor for <code>DirectoryAgent</code> instances, if an
* implementation exists.
@@ -166,14 +164,12 @@
*/
protected static final Constructor daDaemonConstr;
-
/**
* the DA daemon instance, if the implementation exists and no other DA is
- * already running on the machine.
+ * already running on the machine.
*/
public static DirectoryAgentDaemon daDaemon;
-
/**
* the next free XID.
*/
@@ -182,35 +178,38 @@
/**
* used to asynchronously receive replies. query XID -> reply queue (List)
*/
- private static Map<Integer,List<AbstractSLPMessage>> replyListeners = new HashMap<Integer,List<AbstractSLPMessage>>();
+ private static Map<Integer, List<AbstractSLPMessage>> replyListeners = new HashMap<Integer, List<AbstractSLPMessage>>();
/**
* Map of DAs:
*
* String scope -> list of Strings of DA URLs.
*/
- static Map<String,List<String>> dAs = new HashMap<String,List<String>>();
+ static Map<String, List<String>> dAs = new HashMap<String, List<String>>();
-
/**
* Map of sBTs for known DAs:
*
* String IP-Address -> Integer (32bit timestamp)
*
*/
- static Map<String,Integer> statelessBootTimestamps = new HashMap<String, Integer>();
-
-
+ static Map<String, Integer> statelessBootTimestamps = new HashMap<String, Integer>();
+
/**
* Map of DA SPIs:
*
* String DA URL -> String String.
*/
- static Map<String,List<String>> dASPIs = new HashMap<String,List<String>>(); // DA URL -> List of SPIs
-
+ static Map<String, List<String>> dASPIs = new HashMap<String, List<String>>(); // DA
+ // URL
+ // ->
+ // List
+ // of
+ // SPIs
/**
- * timestamp of last DA lookup. According to RFC2608 a DA lookup may only be performed every CONFIG_DA_FIND seconds
+ * timestamp of last DA lookup. According to RFC2608 a DA lookup may only be
+ * performed every CONFIG_DA_FIND seconds
*/
private static long lastDaLookup = 0;
@@ -219,18 +218,19 @@
/**
* MINA Connectors (outgoing)
*
- * SLPCore sends all messages, yet receives only replies.
- * The daemon listens for incoming UDP and TCP packets (requests) on SLP_PORT
+ * SLPCore sends all messages, yet receives only replies. The daemon listens
+ * for incoming UDP and TCP packets (requests) on SLP_PORT
*
*/
- private static AprDatagramConnector sender; //sends UDP packets (default)
- private static AprDatagramConnector[] senderList; //one for each interface
+ private static AprDatagramConnector sender; // sends UDP packets (default)
+ private static AprDatagramConnector[] senderList; // one for each interface
-
- //We also need a SocketConnector to send messages by TCP if necessary (and get the reply)
+ // We also need a SocketConnector to send messages by TCP if necessary (and
+ // get the reply)
private static AprSocketConnector tcpsender;
- //we also need dynamic acceptors to be able to listen for replies to multicast messages
+ // we also need dynamic acceptors to be able to listen for replies to
+ // multicast messages
private static AprDatagramAcceptor[] receiverList;
// the main receiver listening on the SLP port.
@@ -242,14 +242,13 @@
private static final SLPHandler handler = new SLPHandler();
-
/**
* initialize the core class.
*/
static {
try {
LOCALHOST = InetAddress.getLocalHost();
- } catch (Throwable t) {
+ } catch (final Throwable t) {
t.printStackTrace();
}
@@ -259,8 +258,8 @@
Constructor constr = null;
try {
constr = Class.forName("ch.ethz.iks.slp.impl.AdvertiserImpl")
- .getConstructor(locale);
- } catch (Exception e) {
+ .getConstructor(locale);
+ } catch (final Exception e) {
}
advertiser = constr;
@@ -268,17 +267,18 @@
constr = null;
try {
constr = Class.forName("ch.ethz.iks.slp.impl.LocatorImpl")
- .getConstructor(locale);
- } catch (Exception e) {
+ .getConstructor(locale);
+ } catch (final Exception e) {
}
locator = constr;
- // check, if a DA is available
+ // check, if a DA is available
constr = null;
try {
- constr = Class.forName("ch.ethz.iks.slp.impl.da.DirectoryAgentImpl")
- .getConstructor(new Class[]{});
- } catch (Exception e) {
+ constr = Class
+ .forName("ch.ethz.iks.slp.impl.da.DirectoryAgentImpl")
+ .getConstructor(new Class[] {});
+ } catch (final Exception e) {
}
directoryAgent = constr;
@@ -286,30 +286,30 @@
constr = null;
try {
constr = Class.forName("ch.ethz.iks.slp.impl.SLPDaemonImpl")
- .getConstructor(new Class[]{});
- } catch (Exception e) {
+ .getConstructor(new Class[] {});
+ } catch (final Exception e) {
}
daemonConstr = constr;
-
// check, if a DADaemon is available
constr = null;
try {
- constr = Class.forName("ch.ethz.iks.slp.impl.da.DirectoryAgentDaemonImpl")
- .getConstructor(new Class[]{});
- } catch (Exception e) {
+ constr = Class.forName(
+ "ch.ethz.iks.slp.impl.da.DirectoryAgentDaemonImpl")
+ .getConstructor(new Class[] {});
+ } catch (final Exception e) {
}
daDaemonConstr = constr;
-
// read in the property file, if it exists
- File propFile = new File("jslp.properties");
+ final File propFile = new File("jslp.properties");
SLPConfiguration config;
try {
config = propFile.exists() ? new SLPConfiguration(propFile)
- : new SLPConfiguration();
- } catch (IOException e1) {
- System.out.println("Could not parse the property file" + propFile.toString());
+ : new SLPConfiguration();
+ } catch (final IOException e1) {
+ System.out.println("Could not parse the property file"
+ + propFile.toString());
e1.printStackTrace();
config = new SLPConfiguration();
}
@@ -320,30 +320,31 @@
// determine the interfaces on which jSLP runs on
String[] IPs = CONFIG.getInterfaces();
if (IPs == null) {
- List<String> workingInterfaces = new ArrayList<String>();
+ final List<String> workingInterfaces = new ArrayList<String>();
try {
// changed by Lorenz to work on Debian systems
- Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
- for (NetworkInterface intf : Collections.list(nets)){
- Enumeration<InetAddress> inetAdds = intf.getInetAddresses();
+ final Enumeration<NetworkInterface> nets = NetworkInterface
+ .getNetworkInterfaces();
+ for (final NetworkInterface intf : Collections.list(nets)) {
+ final Enumeration<InetAddress> inetAdds = intf
+ .getInetAddresses();
String ip = "";
- while (inetAdds.hasMoreElements()){
+ while (inetAdds.hasMoreElements()) {
ip = inetAdds.nextElement().getHostAddress();
- if (ip.indexOf(":")==-1){
+ if (ip.indexOf(":") == -1) {
break;
}
}
- if (!ip.equals("") && ip.indexOf(":")==-1){
+ if (!ip.equals("") && ip.indexOf(":") == -1) {
workingInterfaces.add(ip);
}
}
- } catch (SocketException se){
+ } catch (final SocketException se) {
}
- IPs = workingInterfaces.toArray(new String[]{});
-
+ IPs = workingInterfaces.toArray(new String[] {});
}
myIPs = IPs;
@@ -355,53 +356,52 @@
InetAddress mcast = null;
try {
mcast = InetAddress.getByName(SLPCore.SLP_MCAST_ADDRESS);
- } catch (UnknownHostException e1) {
+ } catch (final UnknownHostException e1) {
e1.printStackTrace();
}
MCAST_ADDRESS = mcast;
}
-
-
/**
* Initialize the daemon for the SA
*/
- protected static void initSA(){
+ protected static void initSA() {
- if (isSAInitialized){
+ if (isSAInitialized) {
return;
}
- if (!isInitialized){
+ if (!isInitialized) {
init();
}
if (!isListening) {
try {
initializeListeners();
- } catch (ServiceLocationException sle){
- System.out.println("[SLPCORE]: Unable to initialize MINA aceptors, aborting");
- //System.exit(1);
+ } catch (final ServiceLocationException sle) {
+ System.out
+ .println("[SLPCORE]: Unable to initialize MINA aceptors, aborting");
+ // System.exit(1);
}
}
-
+
// check, if there is already a SLP daemon runnung on port 427
// that can be either a jSLP daemon, or an OpenSLP daemon or something
// else. If not, try to start a new daemon instance.
if (daemonConstr != null) {
try {
- daemon = (SLPDaemon) daemonConstr.newInstance(new Object[]{});
- isSAInitialized=true;
- } catch (Exception e) {
+ daemon = (SLPDaemon) daemonConstr.newInstance(new Object[] {});
+ isSAInitialized = true;
+ } catch (final Exception e) {
daemon = null;
}
}
isSAInitialized = true;
-
- String[] daAddresses = CONFIG.getDaAddresses();
+
+ final String[] daAddresses = CONFIG.getDaAddresses();
if (daAddresses == null) {
if (noDiscovery) {
throw new IllegalArgumentException(
- "Configuration 'net.slp.noDaDiscovery=true' requires a non-empty list of preconfigured DAs");
+ "Configuration 'net.slp.noDaDiscovery=true' requires a non-empty list of preconfigured DAs");
}
} else {
try {
@@ -411,64 +411,71 @@
req.setServiceType(new ServiceType(SLP_DA_TYPE));
for (int i = 0; i < daAddresses.length; i++) {
try {
- InetAddress addr = InetAddress.getByName(daAddresses[i]);
+ final InetAddress addr = InetAddress
+ .getByName(daAddresses[i]);
discoveryException = daAddresses[i];
- InetSocketAddress sock = new InetSocketAddress(addr,SLP_PORT);
- DAAdvertisementMessage daa = (DAAdvertisementMessage) sendUnicastMessage(
- req,sock, true);
- if (daa==null){
- platform.logWarning("Error communicating with " + daAddresses[i]);
+ final InetSocketAddress sock = new InetSocketAddress(
+ addr, SLP_PORT);
+ final DAAdvertisementMessage daa = (DAAdvertisementMessage) sendUnicastMessage(
+ req, sock, true);
+ if (daa == null) {
+ platform.logWarning("Error communicating with "
+ + daAddresses[i]);
continue;
}
discoveryException = "";
- String[] scopes = (String[]) daa.getScopes();
+ final String[] scopes = (String[]) daa.getScopes();
for (int j = 0; j < scopes.length; j++) {
platform.logDebug("jSLP is adding DA, "
+ daAddresses[i] + " for the Scope, "
+ scopes[j]);
- SLPUtils.addValue(dAs, scopes[i].toLowerCase(), daAddresses[i]);
+ SLPUtils.addValue(dAs, scopes[i].toLowerCase(),
+ daAddresses[i]);
}
- } catch (ServiceLocationException e) {
+ } catch (final ServiceLocationException e) {
+ discoveryException = "";
+ platform.logWarning("Error communicating with "
+ + daAddresses[i], e);
+ } catch (final UnknownHostException e) {
discoveryException = "";
- platform.logWarning("Error communicating with " + daAddresses[i], e);
- } catch (UnknownHostException e) {
- discoveryException="";
- platform.logWarning("Unknown net.slp.DAAddresses address: " + daAddresses[i], e);
+ platform.logWarning(
+ "Unknown net.slp.DAAddresses address: "
+ + daAddresses[i], e);
}
}
- } catch (IllegalArgumentException ise) {
- discoveryException="";
+ } catch (final IllegalArgumentException ise) {
+ discoveryException = "";
platform.logDebug("May never happen", ise);
}
}
}
- protected static void initDA(ServiceStore store){
+ protected static void initDA(final ServiceStore store) {
- if (isDAInitialized){
+ if (isDAInitialized) {
return;
}
- if (!isInitialized){
+ if (!isInitialized) {
init();
}
if (!isListening) {
try {
initializeListeners();
- } catch (ServiceLocationException sle){
- System.out.println("[SLPCORE]: Unable to initialize MINA aceptors, aborting");
+ } catch (final ServiceLocationException sle) {
+ System.out
+ .println("[SLPCORE]: Unable to initialize MINA aceptors, aborting");
}
}
-
-
// TODO: localization
if (daDaemonConstr != null) {
try {
- daDaemon = (DirectoryAgentDaemon) daDaemonConstr.newInstance(new Object[]{});
+ daDaemon = (DirectoryAgentDaemon) daDaemonConstr
+ .newInstance(new Object[] {});
daDaemon.setStore(store);
- isDAInitialized=true;
- } catch (Exception e) {
+ isDAInitialized = true;
+ } catch (final Exception e) {
e.printStackTrace();
daDaemon = null;
}
@@ -476,26 +483,24 @@
}
-
protected static void init() {
- if(isInitialized) {
+ if (isInitialized) {
return;
}
initializeMina();
-
isInitialized = true;
platform.logDebug("jSLP is running on the following interfaces: "
+ java.util.Arrays.asList(myIPs));
platform.logDebug("jSLP is using port: " + SLP_PORT);
- String[] daAddresses = CONFIG.getDaAddresses();
+ final String[] daAddresses = CONFIG.getDaAddresses();
if (daAddresses == null) {
if (noDiscovery) {
throw new IllegalArgumentException(
- "Configuration 'net.slp.noDaDiscovery=true' requires a non-empty list of preconfigured DAs");
+ "Configuration 'net.slp.noDaDiscovery=true' requires a non-empty list of preconfigured DAs");
}
} else {
try {
@@ -505,28 +510,35 @@
req.setServiceType(new ServiceType(SLP_DA_TYPE));
for (int i = 0; i < daAddresses.length; i++) {
try {
- InetAddress addr = InetAddress.getByName(daAddresses[i]);
- InetSocketAddress sock = new InetSocketAddress(addr,SLP_PORT);
- DAAdvertisementMessage daa = (DAAdvertisementMessage) sendUnicastMessage(
- req,sock, true);
- if (daa==null){
- platform.logWarning("Error communicating with " + daAddresses[i]);
+ final InetAddress addr = InetAddress
+ .getByName(daAddresses[i]);
+ final InetSocketAddress sock = new InetSocketAddress(
+ addr, SLP_PORT);
+ final DAAdvertisementMessage daa = (DAAdvertisementMessage) sendUnicastMessage(
+ req, sock, true);
+ if (daa == null) {
+ platform.logWarning("Error communicating with "
+ + daAddresses[i]);
continue;
}
- String[] scopes = (String[]) daa.getScopes();
+ final String[] scopes = (String[]) daa.getScopes();
for (int j = 0; j < scopes.length; j++) {
platform.logDebug("jSLP is adding DA, "
+ daAddresses[i] + " for the Scope, "
+ scopes[j]);
- SLPUtils.addValue(dAs, scopes[i].toLowerCase(), daAddresses[i]);
+ SLPUtils.addValue(dAs, scopes[i].toLowerCase(),
+ daAddresses[i]);
}
- } catch (ServiceLocationException e) {
- platform.logWarning("Error communicating with " + daAddresses[i], e);
- } catch (UnknownHostException e) {
- platform.logWarning("Unknown net.slp.DAAddresses address: " + daAddresses[i], e);
+ } catch (final ServiceLocationException e) {
+ platform.logWarning("Error communicating with "
+ + daAddresses[i], e);
+ } catch (final UnknownHostException e) {
+ platform.logWarning(
+ "Unknown net.slp.DAAddresses address: "
+ + daAddresses[i], e);
}
}
- } catch (IllegalArgumentException ise) {
+ } catch (final IllegalArgumentException ise) {
platform.logDebug("May never happen", ise);
}
}
@@ -534,50 +546,51 @@
if (!noDiscovery) {
// perform an initial lookup
try {
- List<String> scopes = new ArrayList<String>();
+ final List<String> scopes = new ArrayList<String>();
scopes.add("default");
- // wait CONFIG_START_WAIT
- // added by Lorenz just to make it clear that RFC2608 section 12.2.1 has been considered
- long rand = Math.round(Math.random()*CONFIG.getConfigStartWait());
+ // wait CONFIG_START_WAIT
+ // added by Lorenz just to make it clear that RFC2608 section
+ // 12.2.1 has been considered
+ final long rand = Math.round(Math.random()
+ * CONFIG.getConfigStartWait());
Thread.sleep(rand);
- daLookup((String[]) scopes.toArray(new String[]{}));
- } catch (Exception e) {
+ daLookup((String[]) scopes.toArray(new String[] {}));
+ } catch (final Exception e) {
platform.logError("Exception in initial DA lookup", e);
}
}
}
- public static synchronized void shutdownAdvertiser(){
- daemon=null;
- isSAInitialized= false;
+ public static synchronized void shutdownAdvertiser() {
+ daemon = null;
+ isSAInitialized = false;
}
- public static synchronized void shutdownDirectoryAgent(){
- if (daDaemon!=null){
+ public static synchronized void shutdownDirectoryAgent() {
+ if (daDaemon != null) {
daDaemon.shutdown();
}
- daDaemon=null;
+ daDaemon = null;
isDAInitialized = false;
}
-// public static synchronized void shutdownUserAgent(){
-// if (!isSAInitialized && !isDAInitialized && isInitialized){
-// sender.dispose();
-// for (AprDatagramConnector ac : senderList){
-// ac.dispose();
-// }
-// tcpsender.dispose();
-// for (AprDatagramAcceptor aa : receiverList){
-// aa.dispose();
-// }
-// multicastAcceptor.dispose();
-// multicastTCPAcceptor.dispose();
-// isInitialized = false;
-// }
-// }
-
-
+ // public static synchronized void shutdownUserAgent(){
+ // if (!isSAInitialized && !isDAInitialized && isInitialized){
+ // sender.dispose();
+ // for (AprDatagramConnector ac : senderList){
+ // ac.dispose();
+ // }
+ // tcpsender.dispose();
+ // for (AprDatagramAcceptor aa : receiverList){
+ // aa.dispose();
+ // }
+ // multicastAcceptor.dispose();
+ // multicastTCPAcceptor.dispose();
+ // isInitialized = false;
+ // }
+ // }
+
/**
* get my own IP.
*
@@ -585,19 +598,21 @@
*/
public static InetAddress getMyIP() {
try {
- int i = 0;
+ final int i = 0;
String result = myIPs[i];
- while ((result.equals("127.0.0.1") || result.equals("127.0.1.1")) && (myIPs.length > (i+1))){
+ while ((result.equals("127.0.0.1") || result.equals("127.0.1.1"))
+ && myIPs.length > i + 1) {
result = myIPs[i];
}
- if (result.equals("127.0.1.1")){
+ if (result.equals("127.0.1.1")) {
return InetAddress.getByName("127.0.0.1");
}
return InetAddress.getByName(result);
- } catch (UnknownHostException e) {
- platform.logError("Unknown net.slp.interfaces address: " + myIPs[0], e);
+ } catch (final UnknownHostException e) {
+ platform.logError(
+ "Unknown net.slp.interfaces address: " + myIPs[0], e);
return null;
}
}
@@ -614,8 +629,6 @@
return new ArrayList<String>(dAs.keySet());
}
-
-
/**
* get the next XID.
*
@@ -642,7 +655,8 @@
static void daLookup(final String[] scopes) throws ServiceLocationException {
// added by Lorenz to ensure adherence to RFC2608 section 12.2.1
- if ((System.currentTimeMillis()-lastDaLookup) < CONFIG.getConfigDaFind()){
+ if (System.currentTimeMillis() - lastDaLookup < CONFIG
+ .getConfigDaFind()) {
// oops, too soon...
return;
}
@@ -652,64 +666,79 @@
// added loop for each IP for each interface
// used 1.4 SocketAddress
// altered by Jan to be backwards compatible with Java 2
- // changed by Lorenz to send the same message out on each interface and then wait...
-// build a ServiceRequestMessage
- ServiceRequestMessage sreq = new ServiceRequestMessage();
+ // changed by Lorenz to send the same message out on each interface
+ // and then wait...
+ // build a ServiceRequestMessage
+ final ServiceRequestMessage sreq = new ServiceRequestMessage();
sreq.setServiceType(new ServiceType(SLP_DA_TYPE));
sreq.setScopes(scopes);
sreq.setLocale(SLPCore.DEFAULT_LOCALE);
sreq.setXid(SLPCore.nextXid());
sreq.setMulticast(true);
- InetSocketAddress addr = new InetSocketAddress(MCAST_ADDRESS, SLP_PORT);
- WriteFuture[] futures = new WriteFuture[myIPs.length];
+ final InetSocketAddress addr = new InetSocketAddress(MCAST_ADDRESS,
+ SLP_PORT);
+ final WriteFuture[] futures = new WriteFuture[myIPs.length];
// send the message over each interface with the same xid
for (int i = 0; i < myIPs.length; i++) {
try {
- platform.logTraceMessage("SENT " + sreq + "(udp multicast)");
- ConnectFuture connFuture = senderList[i].connect(addr);
+ platform
+ .logTraceMessage("SENT " + sreq + "(udp multicast)");
+ final ConnectFuture connFuture = senderList[i]
+ .connect(addr);
connFuture.await();
- IoSession session = (AprDatagramSession) connFuture.getSession();
- InetSocketAddress local = (InetSocketAddress) session.getLocalAddress();
+ final IoSession session = (AprDatagramSession) connFuture
+ .getSession();
+ final InetSocketAddress local = (InetSocketAddress) session
+ .getLocalAddress();
receiverList[i] = initReceiver(local);
futures[i] = session.write(sreq);
- platform.logTraceMessage("SENT (" + addr.getHostName() + ":" + addr.getPort() + ") "
- + sreq + " (via udp port " +((InetSocketAddress) session.getLocalAddress()).getPort()+")");
- } catch (InterruptedException ie){
+ platform.logTraceMessage("SENT ("
+ + addr.getHostName()
+ + ":"
+ + addr.getPort()
+ + ") "
+ + sreq
+ + " (via udp port "
+ + ((InetSocketAddress) session.getLocalAddress())
+ .getPort() + ")");
+ } catch (final InterruptedException ie) {
// what do i have to de here?
}
- }
- // make sure the unused receivers are disposed when their lifetime is up
+ }
+ // make sure the unused receivers are disposed when their lifetime
+ // is up
setupReceiverThread(CONFIG.getWaitTime());
// check if they all went out
for (int i = 0; i < futures.length; i++) {
boolean success = false;
try {
- // it would actually be better to wait for a total of WaitTime, not WaitTime per future...
+ // it would actually be better to wait for a total of
+ // WaitTime, not WaitTime per future...
success = futures[i].await(CONFIG.getWaitTime());
- } catch (InterruptedException ie){
- // what do i have to do here? decrement i and continue?
+ } catch (final InterruptedException ie) {
+ // what do i have to do here? decrement i and continue?
}
if (!success) {
// blacklist address
- final List<String> remaining = new ArrayList<String>(java.util.Arrays
- .asList(myIPs));
+ final List<String> remaining = new ArrayList<String>(
+ java.util.Arrays.asList(myIPs));
final String faulty = myIPs[i];
remaining.remove(faulty);
myIPs = (String[]) remaining.toArray(new String[remaining
- .size()]);
+ .size()]);
platform.logDebug("Blacklisting IP " + faulty);
- }
+ }
}
// don't forget to set the timestamp ;)
lastDaLookup = System.currentTimeMillis();
- } catch (IllegalArgumentException ise) {
+ } catch (final IllegalArgumentException ise) {
platform.logDebug("May never happen, no filter set", ise);
}
}
/**
- * send a unicast message over TCP.
+ * send a unicast message over TCP.
*
* @param msg
* the message.
@@ -717,51 +746,61 @@
* @throws ServiceLocationException
* in case of network errors.
*/
- static AbstractSLPReplyMessage sendMessageTCP(final AbstractSLPMessage msg,final InetSocketAddress addr, final boolean expectReply)
- throws ServiceLocationException {
+ static AbstractSLPReplyMessage sendMessageTCP(final AbstractSLPMessage msg,
+ final InetSocketAddress addr, final boolean expectReply)
+ throws ServiceLocationException {
try {
- // Debian and co. have an entry for 127.0.1.1 which leads to problems,
- //therefor that address is changed to 127.0.0.1
+ // Debian and co. have an entry for 127.0.1.1 which leads to
+ // problems,
+ // therefor that address is changed to 127.0.0.1
InetSocketAddress sendAddr = addr;
- if (addr.getAddress().getHostAddress().equals("127.0.1.1")){
- sendAddr = new InetSocketAddress("127.0.0.1",SLP_PORT);
+ if (addr.getAddress().getHostAddress().equals("127.0.1.1")) {
+ sendAddr = new InetSocketAddress("127.0.0.1", SLP_PORT);
}
if (msg.getXid() == 0) {
msg.setXid(nextXid());
}
- ConnectFuture future1 = tcpsender.connect(sendAddr);
+ final ConnectFuture future1 = tcpsender.connect(sendAddr);
future1.awaitUninterruptibly();
if (!future1.isConnected()) {
- throw new ServiceLocationException(ServiceLocationException.NETWORK_INIT_FAILED,"Unable to open TCP socket");
+ throw new ServiceLocationException(
+ ServiceLocationException.NETWORK_INIT_FAILED,
+ "Unable to open TCP socket");
}
- IoSession session = future1.getSession();
+ final IoSession session = future1.getSession();
session.write(msg);
- platform.logTraceMessage("SENT (" + addr.getAddress().getHostAddress() + ":" + addr.getPort() + ") "
- + msg + " (via tcp port " + ((InetSocketAddress)session.getLocalAddress()).getPort()
+ platform.logTraceMessage("SENT ("
+ + addr.getAddress().getHostAddress() + ":" + addr.getPort()
+ + ") " + msg + " (via tcp port "
+ + ((InetSocketAddress) session.getLocalAddress()).getPort()
+ ")");
- if (!expectReply){
+ if (!expectReply) {
session.close(false);
return null;
}
-// need to wait for a reply that is handled by the receiver
- synchronized (replyListeners){
- List<AbstractSLPMessage> reply = (List<AbstractSLPMessage>) replyListeners.get(new Integer(msg.getXid()));
- long start = System.currentTimeMillis();
- while(reply.isEmpty()){
- if (System.currentTimeMillis()-start>SLPCore.CONFIG.getWaitTime()){
+ // need to wait for a reply that is handled by the receiver
+ synchronized (replyListeners) {
+ final List<AbstractSLPMessage> reply = (List<AbstractSLPMessage>) replyListeners
+ .get(new Integer(msg.getXid()));
+ final long start = System.currentTimeMillis();
+ while (reply.isEmpty()) {
+ if (System.currentTimeMillis() - start > SLPCore.CONFIG
+ .getWaitTime()) {
session.close(false);
- throw new ServiceLocationException(ServiceLocationException.NETWORK_TIMED_OUT,"Reply timed out");
+ throw new ServiceLocationException(
+ ServiceLocationException.NETWORK_TIMED_OUT,
+ "Reply timed out");
}
- replyListeners.wait(SLPCore.CONFIG.getWaitTime(),SLPCore.CONFIG.getWaitTime());
+ replyListeners.wait(SLPCore.CONFIG.getWaitTime(),
+ SLPCore.CONFIG.getWaitTime());
}
session.close(false);
return (AbstractSLPReplyMessage) reply.get(0);
}
-
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ServiceLocationException(
ServiceLocationException.NETWORK_ERROR, e.getMessage());
}
@@ -773,14 +812,15 @@
* @param msg
* the message to be sent.
* @param addr
- * the InetSocketAddress containing the destination address
+ * the InetSocketAddress containing the destination address
* @param expectReply
* waits for a reply if set to true.
* @return the reply.
* @throws ServiceLocationException
* in case of network errors etc.
*/
- static AbstractSLPReplyMessage sendReliableUnicastMessage(final AbstractSLPMessage msg,final InetSocketAddress addr,
+ static AbstractSLPReplyMessage sendReliableUnicastMessage(
+ final AbstractSLPMessage msg, final InetSocketAddress addr,
final boolean expectReply) throws ServiceLocationException {
List<AbstractSLPMessage> reply = new ArrayList<AbstractSLPMessage>();
if (msg.getXid() == 0) {
@@ -791,76 +831,82 @@
}
try {
- //register this send event to catch replies
- if (expectReply){
- synchronized (replyListeners){
+ // register this send event to catch replies
+ if (expectReply) {
+ synchronized (replyListeners) {
reply = new ArrayList<AbstractSLPMessage>();
replyListeners.put(new Integer(msg.getXid()), reply);
}
}
// Again, Debian can mess things up a bit, so we fix that here...
InetSocketAddress sendAddr = addr;
- if (addr.getAddress().getHostAddress().equals("127.0.1.1")){
- sendAddr = new InetSocketAddress("127.0.0.1",addr.getPort());
+ if (addr.getAddress().getHostAddress().equals("127.0.1.1")) {
+ sendAddr = new InetSocketAddress("127.0.0.1", addr.getPort());
}
- ConnectFuture connFuture = sender.connect(sendAddr);
+ final ConnectFuture connFuture = sender.connect(sendAddr);
connFuture.await(CONFIG.getDatagramMaxWait());
- if (!connFuture.isConnected()){
- platform.logError("Failed to send over udp: "+msg+" to "+addr.getAddress().getHostAddress()+":"+addr.getPort());
+ if (!connFuture.isConnected()) {
+ platform.logError("Failed to send over udp: " + msg + " to "
+ + addr.getAddress().getHostAddress() + ":"
+ + addr.getPort());
return null;
}
- IoSession session = (AprDatagramSession) connFuture.getSession();
+ final IoSession session = (AprDatagramSession) connFuture
+ .getSession();
session.write(msg);
- platform.logTraceMessage("SENT (" + addr.getAddress().getHostAddress() + ":" + addr.getPort() + ") "
- + msg + " (via udp port " +((InetSocketAddress) session.getLocalAddress()).getPort()
+ platform.logTraceMessage("SENT ("
+ + addr.getAddress().getHostAddress() + ":" + addr.getPort()
+ + ") " + msg + " (via udp port "
+ + ((InetSocketAddress) session.getLocalAddress()).getPort()
+ ")");
- if (!expectReply){
+ if (!expectReply) {
session.close(false);
return null;
}
+ // changed by Lorenz to ensure operation according to RFC2608
+ // section 12.3
+ // need to wait for a reply that is handled by the receiver
+ synchronized (replyListeners) {
-
-
- // changed by Lorenz to ensure operation according to RFC2608 section 12.3
- //need to wait for a reply that is handled by the receiver
- synchronized (replyListeners){
-
- reply=(List<AbstractSLPMessage>) replyListeners.get(new Integer(msg.getXid()));
+ reply = (List<AbstractSLPMessage>) replyListeners
+ .get(new Integer(msg.getXid()));
}
- synchronized (reply){
+ synchronized (reply) {
long now = System.currentTimeMillis();
- long finalTimeout = now + CONFIG.getConfigRetryMax();
+ final long finalTimeout = now + CONFIG.getConfigRetryMax();
long waittime = CONFIG.getConfigRetry();
- while(reply.isEmpty()){
- if ((now=System.currentTimeMillis())>=finalTimeout){
+ while (reply.isEmpty()) {
+ if ((now = System.currentTimeMillis()) >= finalTimeout) {
return null;
}
reply.wait(waittime);
- if (reply.isEmpty()){
+ if (reply.isEmpty()) {
// resend
- if (session.isConnected()){
+ if (session.isConnected()) {
session.write(msg);
} else {
return null;
}
now = System.currentTimeMillis();
- waittime = 2*waittime;
+ waittime = 2 * waittime;
}
}
- AbstractSLPReplyMessage r = (AbstractSLPReplyMessage) reply.get(0);
- //remove the "waiting for reply" registration
+ final AbstractSLPReplyMessage r = (AbstractSLPReplyMessage) reply
+ .get(0);
+ // remove the "waiting for reply" registration
replyListeners.remove(new Integer(msg.getXid()));
session.close(false);
return r;
}
-
- } catch (Throwable t) {
- if (t instanceof IllegalMonitorStateException){
+ } catch (final Throwable t) {
+ if (t instanceof IllegalMonitorStateException) {
platform.logDebug(t.getMessage(), t);
- throw new ServiceLocationException(ServiceLocationException.NETWORK_TIMED_OUT,"Reply timed out");
+ throw new ServiceLocationException(
+ ServiceLocationException.NETWORK_TIMED_OUT,
+ "Reply timed out");
} else {
platform.logDebug(t.getMessage(), t);
throw new ServiceLocationException((short) 1, t.getMessage());
@@ -868,21 +914,21 @@
}
}
-
/**
* send a unicast message over UDP.
*
* @param msg
* the message to be sent.
* @param addr
- * the InetSocketAddress containing the destination address
+ * the InetSocketAddress containing the destination address
* @param expectReply
* waits for a reply if set to true.
* @return the reply.
* @throws ServiceLocationException
* in case of network errors etc.
*/
- public static AbstractSLPReplyMessage sendUnicastMessage(final AbstractSLPMessage msg,final InetSocketAddress addr,
+ public static AbstractSLPReplyMessage sendUnicastMessage(
+ final AbstractSLPMessage msg, final InetSocketAddress addr,
final boolean expectReply) throws ServiceLocationException {
List<AbstractSLPMessage> reply = new ArrayList<AbstractSLPMessage>();
if (msg.getXid() == 0) {
@@ -893,65 +939,70 @@
}
try {
- //register this send event to catch replies
- if (expectReply){
- synchronized (replyListeners){
+ // register this send event to catch replies
+ if (expectReply) {
+ synchronized (replyListeners) {
reply = new ArrayList<AbstractSLPMessage>();
replyListeners.put(new Integer(msg.getXid()), reply);
}
}
// Again, Debian can mess things up a bit, so we fix that here...
InetSocketAddress sendAddr = addr;
- if (addr.getAddress().getHostAddress().equals("127.0.1.1")){
- sendAddr = new InetSocketAddress("127.0.0.1",addr.getPort());
+ if (addr.getAddress().getHostAddress().equals("127.0.1.1")) {
+ sendAddr = new InetSocketAddress("127.0.0.1", addr.getPort());
}
- ConnectFuture connFuture = sender.connect(sendAddr);
+ final ConnectFuture connFuture = sender.connect(sendAddr);
connFuture.await(CONFIG.getDatagramMaxWait());
- if (!connFuture.isConnected()){
- platform.logError("Failed to send over udp: "+msg+" to "+addr.getAddress().getHostAddress()+":"+addr.getPort());
+ if (!connFuture.isConnected()) {
+ platform.logError("Failed to send over udp: " + msg + " to "
+ + addr.getAddress().getHostAddress() + ":"
+ + addr.getPort());
return null;
- }
- IoSession session = (AprDatagramSession) connFuture.getSession();
+ }
+ final IoSession session = (AprDatagramSession) connFuture
+ .getSession();
session.write(msg);
- platform.logTraceMessage("SENT (" + addr.getAddress().getHostAddress() + ":" + addr.getPort() + ") "
- + msg + " (via udp port " +((InetSocketAddress) session.getLocalAddress()).getPort()
+ platform.logTraceMessage("SENT ("
+ + addr.getAddress().getHostAddress() + ":" + addr.getPort()
+ + ") " + msg + " (via udp port "
+ + ((InetSocketAddress) session.getLocalAddress()).getPort()
+ ")");
- if (!expectReply){
+ if (!expectReply) {
session.close(false);
return null;
}
+ // need to wait for a reply that is handled by the receiver
+ synchronized (replyListeners) {
-
- //need to wait for a reply that is handled by the receiver
- synchronized (replyListeners){
-
- reply=(List<AbstractSLPMessage>) replyListeners.get(new Integer(msg.getXid()));
+ reply = (List<AbstractSLPMessage>) replyListeners
+ .get(new Integer(msg.getXid()));
}
- synchronized (reply){
+ synchronized (reply) {
long now = System.currentTimeMillis();
- long timeout = System.currentTimeMillis()+CONFIG.getDatagramMaxWait();
- while(reply.isEmpty()){
- if ((now=System.currentTimeMillis())>=timeout){
+ final long timeout = System.currentTimeMillis()
+ + CONFIG.getDatagramMaxWait();
+ while (reply.isEmpty()) {
+ if ((now = System.currentTimeMillis()) >= timeout) {
return null;
}
- reply.wait(timeout-now);
+ reply.wait(timeout - now);
}
- AbstractSLPReplyMessage r = (AbstractSLPReplyMessage) reply.get(0);
- //remove the "waiting for reply" registration
+ final AbstractSLPReplyMessage r = (AbstractSLPReplyMessage) reply
+ .get(0);
+ // remove the "waiting for reply" registration
replyListeners.remove(new Integer(msg.getXid()));
session.close(false);
return r;
}
-
-
-
- } catch (Throwable t) {
- if (t instanceof IllegalMonitorStateException){
+ } catch (final Throwable t) {
+ if (t instanceof IllegalMonitorStateException) {
platform.logDebug(t.getMessage(), t);
- throw new ServiceLocationException(ServiceLocationException.NETWORK_TIMED_OUT,"Reply timed out");
+ throw new ServiceLocationException(
+ ServiceLocationException.NETWORK_TIMED_OUT,
+ "Reply timed out");
} else {
platform.logDebug(t.getMessage(), t);
throw new ServiceLocationException((short) 1, t.getMessage());
@@ -959,9 +1010,6 @@
}
}
-
-
-
/**
* send a request via multicast convergence algorithm.
*
@@ -971,21 +1019,20 @@
* @throws ServiceLocationException
* in case of network errors.
*/
- static void multicastConvergence(final ReplyFuture replyfuture,final AbstractSLPRequestMessage msg)
- throws ServiceLocationException {
+ static void multicastConvergence(final ReplyFuture replyfuture,
+ final AbstractSLPRequestMessage msg)
+ throws ServiceLocationException {
if (msg.getXid() == 0) {
msg.setXid(SLPCore.nextXid());
}
- new MulticastConvergenceThread(replyfuture,msg);
-
-
+ new MulticastConvergenceThread(replyfuture, msg);
}
- private static boolean isLocalResponder(InetAddress addr) {
- if (addr.getHostAddress().equals("127.0.0.1")){
+ private static boolean isLocalResponder(final InetAddress addr) {
+ if (addr.getHostAddress().equals("127.0.0.1")) {
return true;
}
for (int i = 0; i < SLPCore.myIPs.length; i++) {
@@ -996,24 +1043,23 @@
return false;
}
-
-
/**
* Initialize the MINA components
*/
- private static void initializeMina(){
+ private static void initializeMina() {
try {
// initialize the UDP senders, required by both SA and UA
- senderList=new AprDatagramConnector[myIPs.length];
- for (int i=0;i<myIPs.length;i++){
- AprDatagramConnector adc = new AprDatagramConnector();
+ senderList = new AprDatagramConnector[myIPs.length];
+ for (int i = 0; i < myIPs.length; i++) {
+ final AprDatagramConnector adc = new AprDatagramConnector();
adc.getSessionConfig().setUseReadOperation(true);
adc.setTtl(CONFIG.getMcastTTL());
- adc.setMulticastInterface(myIPs[i]);
+ adc.setMulticastInterface(myIPs[i]);
adc.setHandler(handler);
- adc.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
+ adc.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
adc.getSessionConfig().setReuseAddress(true);
- senderList[i]=adc;
+ senderList[i] = adc;
}
// default sender, sends out over default multicast interface...
@@ -1021,128 +1067,142 @@
sender.getSessionConfig().setUseReadOperation(true);
sender.setTtl(CONFIG.getMcastTTL());
sender.setHandler(handler);
- sender.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
+ sender.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
sender.getSessionConfig().setReuseAddress(true);
// initialize the TCP sender (and reply-receiver...)
tcpsender = new AprSocketConnector();
- tcpsender.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
+ tcpsender.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
tcpsender.setHandler(handler);
-
- } catch (Exception e){
- platform.logError("[SLPCore] Unable to initialize MINA framework, aborting.", e);
- System.out.println("[SLPCORE]: Unable to initialize MINA framework, possibly because another SLP implementation is running on the same ports");
+ } catch (final Exception e) {
+ platform.logError(
+ "[SLPCore] Unable to initialize MINA framework, aborting.",
+ e);
+ System.out
+ .println("[SLPCORE]: Unable to initialize MINA framework, possibly because another SLP implementation is running on the same ports");
}
}
-
- protected static void initializeListeners() throws ServiceLocationException{
- if (isListening){
+ protected static void initializeListeners() throws ServiceLocationException {
+ if (isListening) {
return;
}
- try{
+ try {
multicastTCPAcceptor = new AprSocketAcceptor();
multicastTCPAcceptor.setHandler(handler);
- multicastTCPAcceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
- //TODO: add thread model ?
+ multicastTCPAcceptor.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
+ // TODO: add thread model ?
multicastTCPAcceptor.setReuseAddress(true);
multicastTCPAcceptor.bind(new InetSocketAddress(SLPCore.SLP_PORT));
-
- //set up the UDP receiver
+ // set up the UDP receiver
multicastAcceptor = new AprDatagramAcceptor();
multicastAcceptor.setHandler(handler);
multicastAcceptor.joinGroup(SLPCore.MCAST_ADDRESS.getHostAddress());
- multicastAcceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
- multicastAcceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newFixedThreadPool(10)));
+ multicastAcceptor.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
+ multicastAcceptor.getFilterChain().addLast("threadPool",
+ new ExecutorFilter(Executors.newFixedThreadPool(10)));
- DatagramSessionConfig dcfg = multicastAcceptor.getSessionConfig();
+ final DatagramSessionConfig dcfg = multicastAcceptor
+ .getSessionConfig();
dcfg.setReuseAddress(true);
multicastAcceptor.bind(new InetSocketAddress(SLPCore.SLP_PORT));
-
+
isListening = true;
-
- } catch (Exception e){
- platform.logError("[SLPCore] Unable to initialize MINA acceptors.", e);
- throw new ServiceLocationException(ServiceLocationException.NETWORK_INIT_FAILED,e.getMessage());
+
+ } catch (final Exception e) {
+ platform.logError("[SLPCore] Unable to initialize MINA acceptors.",
+ e);
+ throw new ServiceLocationException(
+ ServiceLocationException.NETWORK_INIT_FAILED, e
+ .getMessage());
}
}
-
-
/**
* initialize a new receiver to listen for replies to multicast requests
*
* @param localaddress
- * The address this receiver should listen on (unicast)
- * @return
- * The initialized and bound AprDatagramAcceptor
+ * The address this receiver should listen on (unicast)
+ * @return The initialized and bound AprDatagramAcceptor
*
* @throws ServiceLocationException
*/
- private static AprDatagramAcceptor initReceiver(InetSocketAddress localaddress) throws ServiceLocationException {
- AprDatagramAcceptor recv = new AprDatagramAcceptor();
+ private static AprDatagramAcceptor initReceiver(
+ InetSocketAddress localaddress) throws ServiceLocationException {
+ final AprDatagramAcceptor recv = new AprDatagramAcceptor();
try {
- if (localaddress.getPort()==0){
- //if no port is specified, listen on the default SLP port for this application
- localaddress = new InetSocketAddress(localaddress.getAddress().getHostAddress(),SLP_PORT);
- }
- if (localaddress.getAddress().getHostAddress().equals("127.0.1.1")){
- //Debian and co. have an entry in .hosts that messes things up, this line fixes that
- localaddress = new InetSocketAddress("127.0.0.1",localaddress.getPort());
+ if (localaddress.getPort() == 0) {
+ // if no port is specified, listen on the default SLP port for
+ // this application
+ localaddress = new InetSocketAddress(localaddress.getAddress()
+ .getHostAddress(), SLP_PORT);
+ }
+ if (localaddress.getAddress().getHostAddress().equals("127.0.1.1")) {
+ // Debian and co. have an entry in .hosts that messes things up,
+ // this line fixes that
+ localaddress = new InetSocketAddress("127.0.0.1", localaddress
+ .getPort());
}
- //initialize the UDP receiver.
+ // initialize the UDP receiver.
recv.setHandler(handler);
- recv.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
- // let each incoming packet be processed in a new thread if possible, as this acceptor
+ recv.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new SLPProtocolCodecFactory()));
+ // let each incoming packet be processed in a new thread if
+ // possible, as this acceptor
// deals with replies to multicast requests
- DatagramSessionConfig dcfg = recv.getSessionConfig();
+ final DatagramSessionConfig dcfg = recv.getSessionConfig();
dcfg.setReuseAddress(true);
recv.bind(localaddress);
return recv;
- } catch (Exception e){
+ } catch (final Exception e) {
recv.dispose();
- throw new ServiceLocationException(ServiceLocationException.NETWORK_INIT_FAILED,"Unable to set up UDP receiver");
+ throw new ServiceLocationException(
+ ServiceLocationException.NETWORK_INIT_FAILED,
+ "Unable to set up UDP receiver");
}
}
-
-
- public static SLPDaemon getDaemon(){
+ public static SLPDaemon getDaemon() {
return daemon;
}
- public static DirectoryAgentDaemon getDirectoryAgentDaemon(){
+ public static DirectoryAgentDaemon getDirectoryAgentDaemon() {
return daDaemon;
}
- public static List<String> getDas(String scope){
+ public static List<String> getDas(final String scope) {
return dAs.get(scope.toLowerCase());
}
- public static PlatformAbstraction getPlatform(){
+ public static PlatformAbstraction getPlatform() {
return platform;
}
- public static synchronized Map getReplyListeners(){
+ public static synchronized Map getReplyListeners() {
return replyListeners;
}
-
-
/**
- * add a reply to the queue and notify any thread waiting for such a reply...
+ * add a reply to the queue and notify any thread waiting for such a
+ * reply...
*
* @param msg
- * the message to be added as a reply
+ * the message to be added as a reply
* @param address
- * the source address, only required for logging
+ * the source address, only required for logging
*/
- public static synchronized void addReply(AbstractSLPMessage msg, InetSocketAddress address){
- List<AbstractSLPMessage> queue = (List<AbstractSLPMessage>) replyListeners.get(new Integer(msg.getXid()));
+ public static synchronized void addReply(final AbstractSLPMessage msg,
+ final InetSocketAddress address) {
+ final List<AbstractSLPMessage> queue = (List<AbstractSLPMessage>) replyListeners
+ .get(new Integer(msg.getXid()));
if (queue != null) {
synchronized (queue) {
queue.add(msg);
@@ -1150,9 +1210,8 @@
}
return;
} else {
- platform.logTraceReg("SRVREPLY recieved ("
- + address.getAddress() + ":" + address.getPort() + ") "
- + msg.toString()
+ platform.logTraceReg("SRVREPLY recieved (" + address.getAddress()
+ + ":" + address.getPort() + ") " + msg.toString()
+ " but not replyListeners present anymore");
}
return;
@@ -1167,131 +1226,143 @@
private static void setupReceiverThread(final long minLifetime) {
new Thread("DALookupThread") {
public void run() {
- platform.logDebug("[DALOOKUP]: Thread starting with lifetime " + minLifetime);
+ platform.logDebug("[DALOOKUP]: Thread starting with lifetime "
+ + minLifetime);
// calculate the end of lifetime
- long timeout = System.currentTimeMillis() + minLifetime+1000;
+ final long timeout = System.currentTimeMillis() + minLifetime
+ + 1000;
long now;
// while lifetime is not expired
- while ((now=System.currentTimeMillis()) < timeout) {
+ while ((now = System.currentTimeMillis()) < timeout) {
try {
- long remain = timeout-now;
+ final long remain = timeout - now;
Thread.sleep(remain);
- } catch (InterruptedException ie){
+ } catch (final InterruptedException ie) {
continue;
}
}
- for (int i=0;i<receiverList.length;i++){
+ for (int i = 0; i < receiverList.length; i++) {
receiverList[i].dispose();
}
- platform.logDebug("[DALOOKUP]: Thread stopping after " + minLifetime);
+ platform.logDebug("[DALOOKUP]: Thread stopping after "
+ + minLifetime);
}
}.start();
}
private static class MulticastConvergenceThread extends Thread {
-
private final ReplyFuture replyfuture;
private final AbstractSLPRequestMessage msg;
- public MulticastConvergenceThread(ReplyFuture rf,AbstractSLPRequestMessage req){
+ public MulticastConvergenceThread(final ReplyFuture rf,
+ final AbstractSLPRequestMessage req) {
super("MulticastConvergenceThread");
- replyfuture=rf;
- msg=req;
+ replyfuture = rf;
+ msg = req;
start();
}
-
public void run() {
- //System.out.println("Entering MCC Algorithm...");
-// One receiver for each interface on which mulitcast requests are sent
- AprDatagramAcceptor[] recv = new AprDatagramAcceptor[myIPs.length];
- AprDatagramSession[] sessions = new AprDatagramSession[myIPs.length];
-// number of interfaces that don't work correctly
+ // System.out.println("Entering MCC Algorithm...");
+ // One receiver for each interface on which mulitcast requests are
+ // sent
+ final AprDatagramAcceptor[] recv = new AprDatagramAcceptor[myIPs.length];
+ final AprDatagramSession[] sessions = new AprDatagramSession[myIPs.length];
+ // number of interfaces that don't work correctly
int failedInterfaces = 0;
try {
- long start = System.currentTimeMillis();
+ final long start = System.currentTimeMillis();
- List<AbstractSLPMessage> replyQueue = new ArrayList<AbstractSLPMessage>();
- List<String> responders = new ArrayList<String>();
- if (msg.getPrevResponders()!=null){
- for (String r : msg.getPrevResponders()){
+ final List<AbstractSLPMessage> replyQueue = new ArrayList<AbstractSLPMessage>();
+ final List<String> responders = new ArrayList<String>();
+ if (msg.getPrevResponders() != null) {
+ for (final String r : msg.getPrevResponders()) {
responders.add(r);
}
}
- List<String> responses = new ArrayList<String>();
+ final List<String> responses = new ArrayList<String>();
if (msg.getXid() == 0) {
msg.setXid(SLPCore.nextXid());
}
// register the reply queue as listener
- Integer queryXID = new Integer(msg.getXid());
+ final Integer queryXID = new Integer(msg.getXid());
synchronized (replyListeners) {
replyListeners.put(queryXID, replyQueue);
}
-
-
- // send to localhost, in case the OS does not support multicast over
+ // send to localhost, in case the OS does not support multicast
+ // over
// loopback which can fail if no SA is running locally
try {
- InetSocketAddress local = new InetSocketAddress(LOCALHOST.getHostAddress(),SLP_PORT);
- replyQueue.add(sendMessageTCP(msg,local,true));
- } catch (ServiceLocationException e) {
+ final InetSocketAddress local = new InetSocketAddress(
+ LOCALHOST.getHostAddress(), SLP_PORT);
+ replyQueue.add(sendMessageTCP(msg, local, true));
+ } catch (final ServiceLocationException e) {
}
msg.setMulticast(true);
- InetSocketAddress remoteaddr = new InetSocketAddress(MCAST_ADDRESS.getHostAddress(),SLP_PORT);
+ final InetSocketAddress remoteaddr = new InetSocketAddress(
+ MCAST_ADDRESS.getHostAddress(), SLP_PORT);
AbstractSLPReplyMessage reply;
-
for (int i = 0; i < myIPs.length; i++) {
try {
// initialize the sessions and the receivers
- InetSocketAddress localaddr = new InetSocketAddress(myIPs[i],0);
+ InetSocketAddress localaddr = new InetSocketAddress(
+ myIPs[i], 0);
- ConnectFuture connFuture = senderList[i].connect(remoteaddr);
+ final ConnectFuture connFuture = senderList[i]
+ .connect(remoteaddr);
try {
connFuture.await(CONFIG.getDatagramMaxWait());
- } catch (InterruptedException ie){
+ } catch (final InterruptedException ie) {
}
- if (!connFuture.isConnected()){
- platform.logError("Failed to send over udp: "+msg+" to "+remoteaddr.getAddress().getHostAddress()+":"+remoteaddr.getPort());
+ if (!connFuture.isConnected()) {
+ platform.logError("Failed to send over udp: " + msg
+ + " to "
+ + remoteaddr.getAddress().getHostAddress()
+ + ":" + remoteaddr.getPort());
i--;
failedInterfaces++;
continue;
- }
- sessions[i] = (AprDatagramSession) connFuture.getSession();
- localaddr = new InetSocketAddress(myIPs[i],((InetSocketAddress)sessions[i].getLocalAddress()).getPort());
+ }
+ sessions[i] = (AprDatagramSession) connFuture
+ .getSession();
+ localaddr = new InetSocketAddress(myIPs[i],
+ ((InetSocketAddress) sessions[i]
+ .getLocalAddress()).getPort());
recv[i] = initReceiver(localaddr);
- } catch (ServiceLocationException sle){
- // the receiver for this interface couldn't get started...
+ } catch (final ServiceLocationException sle) {
+ // the receiver for this interface couldn't get
+ // started...
failedInterfaces++;
i--;
continue;
}
- }
-
+ }
// the multicast convergence algorithm
- long totalTimeout = System.currentTimeMillis()
- + CONFIG.getMcastMaxWait();
- int[] transmissionSchedule = SLPCore.CONFIG.getMcastTimeouts();
+ final long totalTimeout = System.currentTimeMillis()
+ + CONFIG.getMcastMaxWait();
+ final int[] transmissionSchedule = SLPCore.CONFIG
+ .getMcastTimeouts();
int retryCounter = 0;
long nextTimeout;
int failCounter = 0;
boolean seenNew = false;
- boolean seenLocalResponse = false;
+ boolean seenLocalResponse = false;
nextTimeout = System.currentTimeMillis()
- + transmissionSchedule[retryCounter];
+ + transmissionSchedule[retryCounter];
while (!Thread.currentThread().isInterrupted()
&& totalTimeout > System.currentTimeMillis()
@@ -1299,30 +1370,31 @@
&& retryCounter < transmissionSchedule.length
&& failCounter < CONFIG.getConvergenceFailerCount()) {
platform.logDebug("[MCC]: starting round " + retryCounter);
- //System.out.println("Entering round "+retryCounter);
- msg.setPrevResponders((String[]) responders.toArray(new String[]{}));
+ // System.out.println("Entering round "+retryCounter);
+ msg.setPrevResponders((String[]) responders
+ .toArray(new String[] {}));
// finish convergence in case of message size exeeds MTU
if (msg.getSize() > CONFIG.getMTU()) {
- for (int i=0;i<sessions.length-failedInterfaces;i++){
+ for (int i = 0; i < sessions.length - failedInterfaces; i++) {
sessions[i].close(true);
recv[i].dispose();
- replyfuture.setDone(msg.getScopes(),false);
+ replyfuture.setDone(msg.getScopes(), false);
}
break;
}
-
if (msg.getSize() > CONFIG.getMTU() || TCP_ONLY) {
try {
sendMessageTCP(msg, remoteaddr, false);
- } catch (ServiceLocationException sle){
- //couldn't send message over tcp...
+ } catch (final ServiceLocationException sle) {
+ // couldn't send message over tcp...
}
}
- for (int i = 0; i < sessions.length-failedInterfaces; i++){
+ for (int i = 0; i < sessions.length - failedInterfaces; i++) {
sessions[i].write(msg);
- platform.logTraceMessage("SENT " + msg + " on interface " + myIPs[i]);
+ platform.logTraceMessage("SENT " + msg
+ + " on interface " + myIPs[i]);
}
/**
@@ -1332,7 +1404,7 @@
*/
try {
Thread.sleep(transmissionSchedule[retryCounter]);
- } catch (InterruptedException dontcare) {
+ } catch (final InterruptedException dontcare) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
@@ -1342,14 +1414,17 @@
if (replyQueue.isEmpty()) {
failCounter++;
nextTimeout = System.currentTimeMillis()
- + transmissionSchedule[retryCounter++];
+ + transmissionSchedule[retryCounter++];
continue;
}
while (!replyQueue.isEmpty()) {
- reply = (AbstractSLPReplyMessage) replyQueue.remove(0);
- // silently drop duplicate responses, process only new results
+ reply = (AbstractSLPReplyMessage) replyQueue
+ .remove(0);
+ // silently drop duplicate responses, process only
+ // new results
if (!responders.contains(reply.getSource())) {
- if (isLocalResponder(InetAddress.getByName(reply.getSource()))) {
+ if (isLocalResponder(InetAddress
+ .getByName(reply.getSource()))) {
if (seenLocalResponse) {
continue;
} else {
@@ -1361,7 +1436,6 @@
responses.addAll(reply.getResultAsList());
replyfuture.add(reply);
-
}
}
@@ -1372,14 +1446,13 @@
}
}
nextTimeout = System.currentTimeMillis()
- + transmissionSchedule[retryCounter++];
+ + transmissionSchedule[retryCounter++];
}
- for (int i=0;i<sessions.length-failedInterfaces;i++){
+ for (int i = 0; i < sessions.length - failedInterfaces; i++) {
sessions[i].close(true);
recv[i].dispose();
}
-
// we are done, remove the listener queue
synchronized (replyListeners) {
replyListeners.remove(queryXID);
@@ -1390,12 +1463,12 @@
+ (System.currentTimeMillis() - start)
+ " ms, result: " + responses);
- replyfuture.setDone(msg.getScopes(),false);
- } catch (IOException ioe) {
- for (int i=0;i<recv.length-failedInterfaces;i++){
+ replyfuture.setDone(msg.getScopes(), false);
+ } catch (final IOException ioe) {
+ for (int i = 0; i < recv.length - failedInterfaces; i++) {
sessions[i].close(true);
recv[i].dispose();
- //replyfuture.setDone();
+ // replyfuture.setDone();
}
platform.logDebug(ioe.getMessage(), ioe);
@@ -1403,20 +1476,16 @@
}
-
-
-
}
- public static void shutdownCore(){
- if (daDaemon!=null){
+ public static void shutdownCore() {
+ if (daDaemon != null) {
daDaemon.shutdown();
}
- if (daemon!=null){
+ if (daemon != null) {
shutdownAdvertiser();
}
-
- }
+ }
}
Modified: directory/sandbox/slp/src/main/java/org/apache/directory/slp/impl/SLPDaemon.java
URL: http://svn.apache.org/viewvc/directory/sandbox/slp/src/main/java/org/apache/directory/slp/impl/SLPDaemon.java?rev=782980&r1=782979&r2=782980&view=diff
==============================================================================
--- directory/sandbox/slp/src/main/java/org/apache/directory/slp/impl/SLPDaemon.java (original)
+++ directory/sandbox/slp/src/main/java/org/apache/directory/slp/impl/SLPDaemon.java Tue Jun 9 12:50:45 2009
@@ -24,10 +24,6 @@
import org.apache.directory.slp.messages.AbstractSLPReplyMessage;
import org.apache.directory.slp.messages.DAAdvertisementMessage;
-
-
-
-
/**
* the SLPDeaemon interface. Factored out to make the daemon part optional as
* part of the jSLP modularity.