You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/07 20:00:29 UTC

svn commit: r712224 [3/3] - in /activemq/sandbox/kahadb: ./ src/main/java/org/apache/kahadb/ src/main/java/org/apache/kahadb/index/ src/main/java/org/apache/kahadb/journal/ src/main/java/org/apache/kahadb/page/ src/main/java/org/apache/kahadb/replicati...

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java Fri Nov  7 11:00:25 2008
@@ -1,225 +1,225 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kahadb.replication.transport;
-
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportAcceptListener;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.TransportServer;
-import org.apache.kahadb.replication.ReplicationFrame;
-import org.apache.kahadb.replication.pb.PBHeader;
-import org.apache.kahadb.replication.pb.PBJournalLocation;
-import org.apache.kahadb.replication.pb.PBSlaveInit;
-import org.apache.kahadb.replication.pb.PBType;
-
-public class KDBRTransportTest extends TestCase {
-
-	private static final String KDBR_URI = "kdbr://localhost:61618";
-	private List<Object> serverQueue;
-	private List<Object> clientQueue;
-	private List<Transport> serverTransports;
-	private TransportServer server;
-	private Transport client;
-	
-	private Object commandLatchMutex = new Object();
-	private CountDownLatch commandLatch;
-	
-	protected void releaseCommandLatch() {
-		synchronized( commandLatchMutex ) {
-			if( commandLatch == null ) {
-				return;
-			}
-			commandLatch.countDown();
-			commandLatch=null;
-		}
-	}
-	
-	protected CountDownLatch getCommandLatch() {
-		synchronized( commandLatchMutex ) {
-			if( commandLatch == null ) {
-				commandLatch = new CountDownLatch(1);
-			}
-			return commandLatch;
-		}
-	}
-	
-	@Override
-	protected void setUp() throws Exception {
-		serverQueue = Collections.synchronizedList(new ArrayList<Object>()); 
-		clientQueue = Collections.synchronizedList(new ArrayList<Object>()); 
-		serverTransports = Collections.synchronizedList(new ArrayList<Transport>()); 
-		
-		// Setup a server
-		server = TransportFactory.bind(new URI(KDBR_URI));
-		server.setAcceptListener(new TransportAcceptListener() {
-			public void onAccept(Transport transport) {
-				try {
-					transport.setTransportListener(new TransportListener() {
-						public void onCommand(Object command) {
-							try {
-								serverQueue.add(command);
-								process(command);
-								releaseCommandLatch();
-							} catch (IOException e) {
-								onException(e);
-							}
-						}
-
-						public void onException(IOException error) {
-							serverQueue.add(error);
-							serverTransports.remove(this);
-							releaseCommandLatch();
-						}
-
-						public void transportInterupted() {
-						}
-
-						public void transportResumed() {
-						}
-					});
-					transport.start();
-					serverTransports.add(transport);
-				} catch (Exception e) {
-					onAcceptError(e);
-				}
-			}
-
-			public void onAcceptError(Exception error) {
-				error.printStackTrace();
-			}
-		});
-		server.start();
-
-		// Connect a client.
-		client = TransportFactory.connect(new URI(KDBR_URI));
-		client.setTransportListener(new TransportListener() {
-			public void onCommand(Object command) {
-				clientQueue.add(command);
-				releaseCommandLatch();
-			}
-
-			public void onException(IOException error) {
-				clientQueue.add(error);
-				releaseCommandLatch();
-			}
-
-			public void transportInterupted() {
-			}
-
-			public void transportResumed() {
-			}
-		});
-		client.start();	
-	}
-	
-	@Override
-	protected void tearDown() throws Exception {
-		client.stop();
-		server.stop();
-	}
-
-	private void process(Object command) throws IOException {		
-		ReplicationFrame frame = (ReplicationFrame) command;
-		// Since we are processing the commands async in this test case we need to full read the stream before
-		// returning since will be be used to read the next command once we return.
-		if( frame.getHeader().getType() == PBType.FILE_TRANSFER_RESPONSE ) {
-			InputStream ais = (InputStream) frame.getPayload();
-			byte actualPayload[] = new byte[(int)frame.getHeader().getPayloadSize()];
-			readFully(ais, actualPayload);
-			frame.setPayload(actualPayload);
-		}
-	}
-	
-	/**
-	 * Test a frame that has a streaming payload.
-	 * 
-	 * @throws Exception
-	 */
-	public void testFileTransferResponse() throws Exception {
-
-		byte expectedPayload[] = {1,2,3,4,5,6,7,8,9,10}; 
-
-		ReplicationFrame expected = new ReplicationFrame();
-		expected.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(expectedPayload.length));
-		ByteArrayInputStream is = new ByteArrayInputStream(expectedPayload);
-		expected.setPayload(is);
-		
-		CountDownLatch latch = getCommandLatch();
-		client.oneway(expected);
-		is.close();
-		latch.await(2, TimeUnit.SECONDS);
-		
-		assertEquals(1, serverQueue.size());
-		ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
-		
-		assertEquals(expected.getHeader(), actual.getHeader());		
-		assertTrue(Arrays.equals(expectedPayload, (byte[])actual.getPayload()));
-		
-	}
-
-	
-	/**
-	 * Test out sending a frame that has a PB payload.
-	 * 
-	 * @throws Exception
-	 */
-	public void testPBSlaveInitFrame() throws Exception {
-
-
-		ReplicationFrame expected = new ReplicationFrame();
-		expected.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
-		expected.setPayload(new PBSlaveInit().setNodeId("foo"));
-		
-		CountDownLatch latch = getCommandLatch();
-		client.oneway(expected);
-		latch.await(2, TimeUnit.SECONDS);
-		
-		assertEquals(1, serverQueue.size());
-		ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
-		
-		assertEquals(expected.getHeader(), actual.getHeader());
-		assertEquals(expected.getPayload(), actual.getPayload());
-		
-	}
-
-
-	private void readFully(InputStream ais, byte[] actualPayload) throws IOException {
-		int pos = 0;
-		int c;
-		while( pos < actualPayload.length && (c=ais.read(actualPayload, pos, actualPayload.length-pos))>=0 ) {
-			pos += c;
-		}
-		if( pos  < actualPayload.length ) {
-			throw new EOFException();
-		}
-	}
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.kahadb.replication.ReplicationFrame;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBType;
+
+public class KDBRTransportTest extends TestCase {
+
+	private static final String KDBR_URI = "kdbr://localhost:61618";
+	private List<Object> serverQueue;
+	private List<Object> clientQueue;
+	private List<Transport> serverTransports;
+	private TransportServer server;
+	private Transport client;
+	
+	private Object commandLatchMutex = new Object();
+	private CountDownLatch commandLatch;
+	
+	protected void releaseCommandLatch() {
+		synchronized( commandLatchMutex ) {
+			if( commandLatch == null ) {
+				return;
+			}
+			commandLatch.countDown();
+			commandLatch=null;
+		}
+	}
+	
+	protected CountDownLatch getCommandLatch() {
+		synchronized( commandLatchMutex ) {
+			if( commandLatch == null ) {
+				commandLatch = new CountDownLatch(1);
+			}
+			return commandLatch;
+		}
+	}
+	
+	@Override
+	protected void setUp() throws Exception {
+		serverQueue = Collections.synchronizedList(new ArrayList<Object>()); 
+		clientQueue = Collections.synchronizedList(new ArrayList<Object>()); 
+		serverTransports = Collections.synchronizedList(new ArrayList<Transport>()); 
+		
+		// Setup a server
+		server = TransportFactory.bind(new URI(KDBR_URI));
+		server.setAcceptListener(new TransportAcceptListener() {
+			public void onAccept(Transport transport) {
+				try {
+					transport.setTransportListener(new TransportListener() {
+						public void onCommand(Object command) {
+							try {
+								serverQueue.add(command);
+								process(command);
+								releaseCommandLatch();
+							} catch (IOException e) {
+								onException(e);
+							}
+						}
+
+						public void onException(IOException error) {
+							serverQueue.add(error);
+							serverTransports.remove(this);
+							releaseCommandLatch();
+						}
+
+						public void transportInterupted() {
+						}
+
+						public void transportResumed() {
+						}
+					});
+					transport.start();
+					serverTransports.add(transport);
+				} catch (Exception e) {
+					onAcceptError(e);
+				}
+			}
+
+			public void onAcceptError(Exception error) {
+				error.printStackTrace();
+			}
+		});
+		server.start();
+
+		// Connect a client.
+		client = TransportFactory.connect(new URI(KDBR_URI));
+		client.setTransportListener(new TransportListener() {
+			public void onCommand(Object command) {
+				clientQueue.add(command);
+				releaseCommandLatch();
+			}
+
+			public void onException(IOException error) {
+				clientQueue.add(error);
+				releaseCommandLatch();
+			}
+
+			public void transportInterupted() {
+			}
+
+			public void transportResumed() {
+			}
+		});
+		client.start();	
+	}
+	
+	@Override
+	protected void tearDown() throws Exception {
+		client.stop();
+		server.stop();
+	}
+
+	private void process(Object command) throws IOException {		
+		ReplicationFrame frame = (ReplicationFrame) command;
+		// Since we are processing the commands async in this test case we need to full read the stream before
+		// returning since will be be used to read the next command once we return.
+		if( frame.getHeader().getType() == PBType.FILE_TRANSFER_RESPONSE ) {
+			InputStream ais = (InputStream) frame.getPayload();
+			byte actualPayload[] = new byte[(int)frame.getHeader().getPayloadSize()];
+			readFully(ais, actualPayload);
+			frame.setPayload(actualPayload);
+		}
+	}
+	
+	/**
+	 * Test a frame that has a streaming payload.
+	 * 
+	 * @throws Exception
+	 */
+	public void testFileTransferResponse() throws Exception {
+
+		byte expectedPayload[] = {1,2,3,4,5,6,7,8,9,10}; 
+
+		ReplicationFrame expected = new ReplicationFrame();
+		expected.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(expectedPayload.length));
+		ByteArrayInputStream is = new ByteArrayInputStream(expectedPayload);
+		expected.setPayload(is);
+		
+		CountDownLatch latch = getCommandLatch();
+		client.oneway(expected);
+		is.close();
+		latch.await(2, TimeUnit.SECONDS);
+		
+		assertEquals(1, serverQueue.size());
+		ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
+		
+		assertEquals(expected.getHeader(), actual.getHeader());		
+		assertTrue(Arrays.equals(expectedPayload, (byte[])actual.getPayload()));
+		
+	}
+
+	
+	/**
+	 * Test out sending a frame that has a PB payload.
+	 * 
+	 * @throws Exception
+	 */
+	public void testPBSlaveInitFrame() throws Exception {
+
+
+		ReplicationFrame expected = new ReplicationFrame();
+		expected.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
+		expected.setPayload(new PBSlaveInit().setNodeId("foo"));
+		
+		CountDownLatch latch = getCommandLatch();
+		client.oneway(expected);
+		latch.await(2, TimeUnit.SECONDS);
+		
+		assertEquals(1, serverQueue.size());
+		ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
+		
+		assertEquals(expected.getHeader(), actual.getHeader());
+		assertEquals(expected.getPayload(), actual.getPayload());
+		
+	}
+
+
+	private void readFully(InputStream ais, byte[] actualPayload) throws IOException {
+		int pos = 0;
+		int c;
+		while( pos < actualPayload.length && (c=ais.read(actualPayload, pos, actualPayload.length-pos))>=0 ) {
+			pos += c;
+		}
+		if( pos  < actualPayload.length ) {
+			throw new EOFException();
+		}
+	}
+}

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java Fri Nov  7 11:00:25 2008
@@ -22,7 +22,7 @@
 import org.apache.kahadb.store.KahaDBStore;
 
 /**
- * @version $Revision: 1.3 $
+ * @version $Revision$
  */
 public class KahaStoreDurableTopicTest extends SimpleDurableTopicTest {
 

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java Fri Nov  7 11:00:25 2008
@@ -22,7 +22,7 @@
 import org.apache.kahadb.store.KahaDBStore;
 
 /**
- * @version $Revision: 1.3 $
+ * @version $Revision$
  */
 public class KahaStoreQueueTest extends SimpleQueueTest {
 

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: activemq/sandbox/kahadb/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:eol-style = native