You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openjpa.apache.org by pc...@apache.org on 2007/05/11 18:12:08 UTC
svn commit: r537221 -
/incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
Author: pcl
Date: Fri May 11 09:12:07 2007
New Revision: 537221
URL: http://svn.apache.org/viewvc?view=rev&rev=537221
Log:
OPENJPA-230. Updated patch based on an out-of-band patch from Vishal. This version avoids interrupts while still handling guaranteed delivery of messages in the queue.
Modified:
incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
Modified: incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
URL: http://svn.apache.org/viewvc/incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java?view=diff&rev=537221&r1=537220&r2=537221
==============================================================================
--- incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java (original)
+++ incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java Fri May 11 09:12:07 2007
@@ -25,7 +25,6 @@
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.OptionalDataException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
@@ -40,6 +39,8 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
+import java.util.List;
+import java.util.Collections;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
@@ -50,6 +51,7 @@
import org.apache.openjpa.util.InternalException;
import org.apache.openjpa.util.Serialization;
import org.apache.openjpa.lib.util.concurrent.ReentrantLock;
+import org.apache.openjpa.lib.util.concurrent.Concurrent;
import serp.util.Strings;
@@ -68,14 +70,14 @@
private static final int DEFAULT_PORT = 5636;
- private static Localizer s_loc = Localizer.forPackage
+ private static final Localizer s_loc = Localizer.forPackage
(TCPRemoteCommitProvider.class);
private static long s_idSequence = System.currentTimeMillis();
// A map of listen ports to listeners in this JVM. We might
// want to look into allowing same port, different interface --
// that is not currently possible in a single JVM.
- private static Map s_portListenerMap = new HashMap();
+ private static final Map s_portListenerMap = new HashMap();
private long _id;
private byte[] _localhost;
@@ -85,7 +87,8 @@
private int _recoveryTimeMillis = 15000;
private TCPPortListener _listener;
private BroadcastQueue _broadcastQueue = new BroadcastQueue();
- private LinkedList _broadcastThreads = new LinkedList();
+ private final List _broadcastThreads = Collections.synchronizedList(
+ new LinkedList());
private ArrayList _addresses = new ArrayList();
private ReentrantLock _addressesLock;
@@ -177,7 +180,7 @@
// Threads will not end until they send another pk.
for (int i = numBroadcastThreads; i < cur; i++) {
BroadcastWorkerThread worker = (BroadcastWorkerThread)
- _broadcastThreads.removeFirst();
+ _broadcastThreads.remove(0);
worker.setRunning(false);
}
} else if (cur < numBroadcastThreads) {
@@ -186,7 +189,7 @@
BroadcastWorkerThread wt = new BroadcastWorkerThread();
wt.setDaemon(true);
wt.start();
- _broadcastThreads.addLast(wt);
+ _broadcastThreads.add(wt);
}
}
}
@@ -364,6 +367,18 @@
if (_listener != null)
_listener.removeProvider(this);
+ // Remove Broadcast Threads then close sockets.
+ _broadcastQueue.close();
+
+ // Wait for _broadcastThreads to get cleaned up.
+ while(!_broadcastThreads.isEmpty()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ // Ignore.
+ }
+ }
+
_addressesLock.lock();
try {
for (Iterator iter = _addresses.iterator(); iter.hasNext();)
@@ -371,16 +386,6 @@
} finally {
_addressesLock.unlock();
}
-
- // We are done transmitting. Interrupt any worker threads.
- synchronized (_broadcastThreads) {
- Thread worker;
- for (Iterator iter = _broadcastThreads.iterator();
- iter.hasNext();) {
- worker = (Thread) iter.next();
- worker.interrupt();
- }
- }
}
/**
@@ -390,18 +395,38 @@
*/
private static class BroadcastQueue {
- LinkedList _packetQueue = new LinkedList();
+ private LinkedList _packetQueue = new LinkedList();
+ private boolean _closed = false;
+
+ public synchronized void close() {
+ _closed = true;
+ notifyAll();
+ }
+
+ public synchronized boolean isClosed() {
+ return _closed;
+ }
public synchronized void addPacket(byte[] bytes) {
_packetQueue.addLast(bytes);
notify();
}
+ /**
+ * @return the bytes defining the packet to process, or
+ * <code>null</code> if the queue is empty.
+ */
public synchronized byte[] removePacket()
throws InterruptedException {
- while (_packetQueue.isEmpty())
+ // only wait if the queue is still open. This allows processing
+ // of events in the queue to continue, while avoiding sleeping
+ // during shutdown.
+ while (!_closed && _packetQueue.isEmpty())
wait();
- return (byte[]) _packetQueue.removeFirst();
+ if (_packetQueue.isEmpty())
+ return null;
+ else
+ return (byte[]) _packetQueue.removeFirst();
}
}
@@ -416,19 +441,28 @@
public void run() {
while (_keepRunning) {
try {
- // This will block until there is a packet to send.
+ // This will block until there is a packet to send, or
+ // until the queue is closed.
byte[] bytes = _broadcastQueue.removePacket();
- sendUpdatePacket(bytes);
+ if (bytes != null)
+ sendUpdatePacket(bytes);
+ else if (_broadcastQueue.isClosed())
+ _keepRunning = false;
} catch (InterruptedException e) {
// End the thread.
break;
}
}
+ remove();
}
public void setRunning(boolean keepRunning) {
_keepRunning = keepRunning;
}
+
+ private void remove() {
+ _broadcastThreads.remove(this);
+ }
}
/**
@@ -437,14 +471,11 @@
private static class TCPPortListener
implements Runnable {
- private static Localizer s_loc = Localizer.forPackage
- (TCPPortListener.class);
-
private final Log _log;
private ServerSocket _receiveSocket;
private Thread _acceptThread;
private Set _receiverThreads = new HashSet();
- private Set _providers = new HashSet();
+ private final Set _providers = new HashSet();
/**
* Cache the local IP address
@@ -466,7 +497,7 @@
* Construct a new TCPPortListener configured to use the specified port.
*/
private TCPPortListener(int port, Log log)
- throws UnknownHostException, IOException {
+ throws IOException {
_port = port;
_log = log;
_receiveSocket = new ServerSocket(_port);
@@ -637,19 +668,19 @@
+ ":" + _s.getPort()));
}
break;
- } catch (Exception e) {
+ } catch (Throwable e) {
if (_log.isWarnEnabled())
_log.warn(s_loc.get("tcp-receive-error"), e);
break;
- } catch (Throwable t) {
}
}
// We are done receiving on this socket and this worker
// thread is terminating.
try {
_in.close();
- _s.close();
- } catch (Exception e) {
+ if (_s != null)
+ _s.close();
+ } catch (IOException e) {
_log.warn(s_loc.get("tcp-close-socket-error",
_s.getInetAddress().getHostAddress() + ":"
+ _s.getPort()), e);
@@ -658,11 +689,10 @@
/**
* Process an {@link InputStream} containing objects written
- * by {@link TCPRemoteCommitProvider#broadcastCommitInfo}.
+ * by {@link TCPRemoteCommitProvider#broadcast(RemoteCommitEvent)}.
*/
private void handle(InputStream in)
- throws IOException, ClassNotFoundException,
- OptionalDataException {
+ throws IOException, ClassNotFoundException {
// This will block waiting for the next
ObjectInputStream ois =
new Serialization.ClassResolvingObjectInputStream(in);