You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/07 20:00:29 UTC
svn commit: r712224 [3/3] - in /activemq/sandbox/kahadb: ./
src/main/java/org/apache/kahadb/ src/main/java/org/apache/kahadb/index/
src/main/java/org/apache/kahadb/journal/
src/main/java/org/apache/kahadb/page/
src/main/java/org/apache/kahadb/replicati...
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java Fri Nov 7 11:00:25 2008
@@ -1,225 +1,225 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kahadb.replication.transport;
-
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportAcceptListener;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.TransportServer;
-import org.apache.kahadb.replication.ReplicationFrame;
-import org.apache.kahadb.replication.pb.PBHeader;
-import org.apache.kahadb.replication.pb.PBJournalLocation;
-import org.apache.kahadb.replication.pb.PBSlaveInit;
-import org.apache.kahadb.replication.pb.PBType;
-
-public class KDBRTransportTest extends TestCase {
-
- private static final String KDBR_URI = "kdbr://localhost:61618";
- private List<Object> serverQueue;
- private List<Object> clientQueue;
- private List<Transport> serverTransports;
- private TransportServer server;
- private Transport client;
-
- private Object commandLatchMutex = new Object();
- private CountDownLatch commandLatch;
-
- protected void releaseCommandLatch() {
- synchronized( commandLatchMutex ) {
- if( commandLatch == null ) {
- return;
- }
- commandLatch.countDown();
- commandLatch=null;
- }
- }
-
- protected CountDownLatch getCommandLatch() {
- synchronized( commandLatchMutex ) {
- if( commandLatch == null ) {
- commandLatch = new CountDownLatch(1);
- }
- return commandLatch;
- }
- }
-
- @Override
- protected void setUp() throws Exception {
- serverQueue = Collections.synchronizedList(new ArrayList<Object>());
- clientQueue = Collections.synchronizedList(new ArrayList<Object>());
- serverTransports = Collections.synchronizedList(new ArrayList<Transport>());
-
- // Setup a server
- server = TransportFactory.bind(new URI(KDBR_URI));
- server.setAcceptListener(new TransportAcceptListener() {
- public void onAccept(Transport transport) {
- try {
- transport.setTransportListener(new TransportListener() {
- public void onCommand(Object command) {
- try {
- serverQueue.add(command);
- process(command);
- releaseCommandLatch();
- } catch (IOException e) {
- onException(e);
- }
- }
-
- public void onException(IOException error) {
- serverQueue.add(error);
- serverTransports.remove(this);
- releaseCommandLatch();
- }
-
- public void transportInterupted() {
- }
-
- public void transportResumed() {
- }
- });
- transport.start();
- serverTransports.add(transport);
- } catch (Exception e) {
- onAcceptError(e);
- }
- }
-
- public void onAcceptError(Exception error) {
- error.printStackTrace();
- }
- });
- server.start();
-
- // Connect a client.
- client = TransportFactory.connect(new URI(KDBR_URI));
- client.setTransportListener(new TransportListener() {
- public void onCommand(Object command) {
- clientQueue.add(command);
- releaseCommandLatch();
- }
-
- public void onException(IOException error) {
- clientQueue.add(error);
- releaseCommandLatch();
- }
-
- public void transportInterupted() {
- }
-
- public void transportResumed() {
- }
- });
- client.start();
- }
-
- @Override
- protected void tearDown() throws Exception {
- client.stop();
- server.stop();
- }
-
- private void process(Object command) throws IOException {
- ReplicationFrame frame = (ReplicationFrame) command;
- // Since we are processing the commands async in this test case we need to full read the stream before
- // returning since will be be used to read the next command once we return.
- if( frame.getHeader().getType() == PBType.FILE_TRANSFER_RESPONSE ) {
- InputStream ais = (InputStream) frame.getPayload();
- byte actualPayload[] = new byte[(int)frame.getHeader().getPayloadSize()];
- readFully(ais, actualPayload);
- frame.setPayload(actualPayload);
- }
- }
-
- /**
- * Test a frame that has a streaming payload.
- *
- * @throws Exception
- */
- public void testFileTransferResponse() throws Exception {
-
- byte expectedPayload[] = {1,2,3,4,5,6,7,8,9,10};
-
- ReplicationFrame expected = new ReplicationFrame();
- expected.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(expectedPayload.length));
- ByteArrayInputStream is = new ByteArrayInputStream(expectedPayload);
- expected.setPayload(is);
-
- CountDownLatch latch = getCommandLatch();
- client.oneway(expected);
- is.close();
- latch.await(2, TimeUnit.SECONDS);
-
- assertEquals(1, serverQueue.size());
- ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
-
- assertEquals(expected.getHeader(), actual.getHeader());
- assertTrue(Arrays.equals(expectedPayload, (byte[])actual.getPayload()));
-
- }
-
-
- /**
- * Test out sending a frame that has a PB payload.
- *
- * @throws Exception
- */
- public void testPBSlaveInitFrame() throws Exception {
-
-
- ReplicationFrame expected = new ReplicationFrame();
- expected.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
- expected.setPayload(new PBSlaveInit().setNodeId("foo"));
-
- CountDownLatch latch = getCommandLatch();
- client.oneway(expected);
- latch.await(2, TimeUnit.SECONDS);
-
- assertEquals(1, serverQueue.size());
- ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
-
- assertEquals(expected.getHeader(), actual.getHeader());
- assertEquals(expected.getPayload(), actual.getPayload());
-
- }
-
-
- private void readFully(InputStream ais, byte[] actualPayload) throws IOException {
- int pos = 0;
- int c;
- while( pos < actualPayload.length && (c=ais.read(actualPayload, pos, actualPayload.length-pos))>=0 ) {
- pos += c;
- }
- if( pos < actualPayload.length ) {
- throw new EOFException();
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.kahadb.replication.ReplicationFrame;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBType;
+
+public class KDBRTransportTest extends TestCase {
+
+ private static final String KDBR_URI = "kdbr://localhost:61618";
+ private List<Object> serverQueue;
+ private List<Object> clientQueue;
+ private List<Transport> serverTransports;
+ private TransportServer server;
+ private Transport client;
+
+ private Object commandLatchMutex = new Object();
+ private CountDownLatch commandLatch;
+
+ protected void releaseCommandLatch() {
+ synchronized( commandLatchMutex ) {
+ if( commandLatch == null ) {
+ return;
+ }
+ commandLatch.countDown();
+ commandLatch=null;
+ }
+ }
+
+ protected CountDownLatch getCommandLatch() {
+ synchronized( commandLatchMutex ) {
+ if( commandLatch == null ) {
+ commandLatch = new CountDownLatch(1);
+ }
+ return commandLatch;
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ serverQueue = Collections.synchronizedList(new ArrayList<Object>());
+ clientQueue = Collections.synchronizedList(new ArrayList<Object>());
+ serverTransports = Collections.synchronizedList(new ArrayList<Transport>());
+
+ // Setup a server
+ server = TransportFactory.bind(new URI(KDBR_URI));
+ server.setAcceptListener(new TransportAcceptListener() {
+ public void onAccept(Transport transport) {
+ try {
+ transport.setTransportListener(new TransportListener() {
+ public void onCommand(Object command) {
+ try {
+ serverQueue.add(command);
+ process(command);
+ releaseCommandLatch();
+ } catch (IOException e) {
+ onException(e);
+ }
+ }
+
+ public void onException(IOException error) {
+ serverQueue.add(error);
+ serverTransports.remove(this);
+ releaseCommandLatch();
+ }
+
+ public void transportInterupted() {
+ }
+
+ public void transportResumed() {
+ }
+ });
+ transport.start();
+ serverTransports.add(transport);
+ } catch (Exception e) {
+ onAcceptError(e);
+ }
+ }
+
+ public void onAcceptError(Exception error) {
+ error.printStackTrace();
+ }
+ });
+ server.start();
+
+ // Connect a client.
+ client = TransportFactory.connect(new URI(KDBR_URI));
+ client.setTransportListener(new TransportListener() {
+ public void onCommand(Object command) {
+ clientQueue.add(command);
+ releaseCommandLatch();
+ }
+
+ public void onException(IOException error) {
+ clientQueue.add(error);
+ releaseCommandLatch();
+ }
+
+ public void transportInterupted() {
+ }
+
+ public void transportResumed() {
+ }
+ });
+ client.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ client.stop();
+ server.stop();
+ }
+
+ private void process(Object command) throws IOException {
+ ReplicationFrame frame = (ReplicationFrame) command;
+ // Since we are processing the commands async in this test case we need to full read the stream before
+ // returning since will be be used to read the next command once we return.
+ if( frame.getHeader().getType() == PBType.FILE_TRANSFER_RESPONSE ) {
+ InputStream ais = (InputStream) frame.getPayload();
+ byte actualPayload[] = new byte[(int)frame.getHeader().getPayloadSize()];
+ readFully(ais, actualPayload);
+ frame.setPayload(actualPayload);
+ }
+ }
+
+ /**
+ * Test a frame that has a streaming payload.
+ *
+ * @throws Exception
+ */
+ public void testFileTransferResponse() throws Exception {
+
+ byte expectedPayload[] = {1,2,3,4,5,6,7,8,9,10};
+
+ ReplicationFrame expected = new ReplicationFrame();
+ expected.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(expectedPayload.length));
+ ByteArrayInputStream is = new ByteArrayInputStream(expectedPayload);
+ expected.setPayload(is);
+
+ CountDownLatch latch = getCommandLatch();
+ client.oneway(expected);
+ is.close();
+ latch.await(2, TimeUnit.SECONDS);
+
+ assertEquals(1, serverQueue.size());
+ ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
+
+ assertEquals(expected.getHeader(), actual.getHeader());
+ assertTrue(Arrays.equals(expectedPayload, (byte[])actual.getPayload()));
+
+ }
+
+
+ /**
+ * Test out sending a frame that has a PB payload.
+ *
+ * @throws Exception
+ */
+ public void testPBSlaveInitFrame() throws Exception {
+
+
+ ReplicationFrame expected = new ReplicationFrame();
+ expected.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
+ expected.setPayload(new PBSlaveInit().setNodeId("foo"));
+
+ CountDownLatch latch = getCommandLatch();
+ client.oneway(expected);
+ latch.await(2, TimeUnit.SECONDS);
+
+ assertEquals(1, serverQueue.size());
+ ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
+
+ assertEquals(expected.getHeader(), actual.getHeader());
+ assertEquals(expected.getPayload(), actual.getPayload());
+
+ }
+
+
+ private void readFully(InputStream ais, byte[] actualPayload) throws IOException {
+ int pos = 0;
+ int c;
+ while( pos < actualPayload.length && (c=ais.read(actualPayload, pos, actualPayload.length-pos))>=0 ) {
+ pos += c;
+ }
+ if( pos < actualPayload.length ) {
+ throw new EOFException();
+ }
+ }
+}
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java Fri Nov 7 11:00:25 2008
@@ -22,7 +22,7 @@
import org.apache.kahadb.store.KahaDBStore;
/**
- * @version $Revision: 1.3 $
+ * @version $Revision$
*/
public class KahaStoreDurableTopicTest extends SimpleDurableTopicTest {
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java Fri Nov 7 11:00:25 2008
@@ -22,7 +22,7 @@
import org.apache.kahadb.store.KahaDBStore;
/**
- * @version $Revision: 1.3 $
+ * @version $Revision$
*/
public class KahaStoreQueueTest extends SimpleQueueTest {
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/resources/log4j.properties
------------------------------------------------------------------------------
svn:eol-style = native