You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/07 20:00:29 UTC
svn commit: r712224 [2/3] - in /activemq/sandbox/kahadb: ./
src/main/java/org/apache/kahadb/ src/main/java/org/apache/kahadb/index/
src/main/java/org/apache/kahadb/journal/
src/main/java/org/apache/kahadb/page/
src/main/java/org/apache/kahadb/replicati...
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Fri Nov 7 11:00:25 2008
@@ -1,275 +1,275 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kahadb.replication;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.Service;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.replication.pb.PBFileInfo;
-import org.apache.kahadb.replication.pb.PBHeader;
-import org.apache.kahadb.replication.pb.PBSlaveInit;
-import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
-import org.apache.kahadb.replication.pb.PBType;
-
-public class ReplicationSlave implements Service, ClusterListener, TransportListener {
- private static final Log LOG = LogFactory.getLog(ReplicationSlave.class);
-
- private final ReplicationServer replicationServer;
- private Transport transport;
-
- public ReplicationSlave(ReplicationServer replicationServer) {
- this.replicationServer = replicationServer;
- }
-
- public void start() throws Exception {
- transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
- transport.setTransportListener(this);
- transport.start();
-
- ReplicationFrame frame = new ReplicationFrame();
- frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
- PBSlaveInit payload = new PBSlaveInit();
- payload.setNodeId(replicationServer.getNodeId());
- frame.setPayload(payload);
- LOG.info("Sending master slave init command: "+payload);
- transport.oneway(frame);
-
- }
-
- public void stop() throws Exception {
- }
-
- public void onClusterChange(ClusterState config) {
- }
-
- public void onCommand(Object command) {
- try {
- ReplicationFrame frame = (ReplicationFrame) command;
- switch (frame.getHeader().getType()) {
- case SLAVE_INIT_RESPONSE:
- onSlaveInitResponse(frame, (PBSlaveInitResponse) frame.getPayload());
- break;
- }
- } catch (Exception e) {
- failed(e);
- }
- }
-
- public void onException(IOException error) {
- failed(error);
- }
-
- public void failed(Exception error) {
- try {
- error.printStackTrace();
- stop();
- } catch (Exception ignore) {
- }
- }
-
- public void transportInterupted() {
- }
-
- public void transportResumed() {
- }
-
- private Object transferMutex = new Object();
- private LinkedList<PBFileInfo> transferQueue = new LinkedList<PBFileInfo>();
-
- private void onSlaveInitResponse(ReplicationFrame frame, PBSlaveInitResponse response) throws Exception {
- LOG.info("Got init response: "+response);
- delete(response.getDeleteFilesList());
- synchronized(transferMutex) {
- transferQueue.clear();
- transferQueue.addAll(response.getCopyFilesList());
- }
- addTransferSession();
- }
-
-
- private PBFileInfo dequeueTransferQueue() throws Exception {
- synchronized( transferMutex ) {
- if( transferQueue.isEmpty() ) {
- return null;
- }
- return transferQueue.removeFirst();
- }
- }
-
- LinkedList<TransferSession> transferSessions = new LinkedList<TransferSession>();
-
- private void addTransferSession() throws Exception {
- synchronized( transferMutex ) {
- while( !transferQueue.isEmpty() && transferSessions.size()<5 ) {
- TransferSession transferSession = new TransferSession();
- transferSessions.add(transferSession);
- try {
- transferSession.start();
- } catch (Exception e) {
- transferSessions.remove(transferSession);
- }
- }
- }
- }
-
- class TransferSession implements Service, TransportListener {
-
- Transport transport;
- private PBFileInfo info;
- private File toFile;
- private AtomicBoolean stopped = new AtomicBoolean();
- private long transferStart;
-
- public void start() throws Exception {
- LOG.info("File transfer session started.");
- transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
- transport.setTransportListener(this);
- transport.start();
- sendNextRequestOrStop();
- }
-
- private void sendNextRequestOrStop() {
- try {
- PBFileInfo info = dequeueTransferQueue();
- if( info !=null ) {
-
- toFile = replicationServer.getReplicationFile(info.getName());
- this.info = info;
-
- ReplicationFrame frame = new ReplicationFrame();
- frame.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER));
- frame.setPayload(info);
-
- LOG.info("Requesting file: "+info.getName());
- transferStart = System.currentTimeMillis();
-
- transport.oneway(frame);
- } else {
- stop();
- }
-
- } catch ( Exception e ) {
- failed(e);
- }
- }
-
- public void stop() throws Exception {
- if( stopped.compareAndSet(false, true) ) {
- LOG.info("File transfer session stopped.");
- synchronized( transferMutex ) {
- if( info!=null ) {
- transferQueue.addLast(info);
- }
- info = null;
- }
- Thread stopThread = new Thread("Transfer Session Shutdown: "+transport.getRemoteAddress()) {
- @Override
- public void run() {
- try {
- transport.stop();
- synchronized( transferMutex ) {
- transferSessions.remove(this);
- addTransferSession();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- stopThread.setDaemon(true);
- stopThread.start();
- }
- }
-
- public void onCommand(Object command) {
- try {
- ReplicationFrame frame = (ReplicationFrame) command;
- InputStream is = (InputStream) frame.getPayload();
- toFile.getParentFile().mkdirs();
- FileOutputStream os = new FileOutputStream( toFile );
- try {
- copy(is, os, frame.getHeader().getPayloadSize());
- long transferTime = System.currentTimeMillis()-this.transferStart;
- float rate = frame.getHeader().getPayloadSize()*transferTime/1024000f;
- LOG.info("File "+info.getName()+" transfered in "+transferTime+" (ms) at "+rate+" Kb/Sec");
- } finally {
- os.close();
- }
- this.info = null;
- this.toFile = null;
-
- sendNextRequestOrStop();
- } catch (Exception e) {
- failed(e);
- }
- }
-
- public void onException(IOException error) {
- failed(error);
- }
-
- public void failed(Exception error) {
- try {
- if( !stopped.get() ) {
- LOG.warn("Replication session failure: "+transport.getRemoteAddress());
- }
- stop();
- } catch (Exception ignore) {
- }
- }
-
- public void transportInterupted() {
- }
- public void transportResumed() {
- }
-
- }
-
- private void copy(InputStream is, OutputStream os, long length) throws IOException {
- byte buffer[] = new byte[1024 * 4];
- int c=0;
- long pos=0;
- while ( pos <length && ((c = is.read(buffer, 0, (int)Math.min(buffer.length, length-pos))) >= 0) ) {
- os.write(buffer, 0, c);
- pos+=c;
- }
- }
-
- private void delete(List<String> files) {
- for (String fn : files) {
- try {
- replicationServer.getReplicationFile(fn).delete();
- } catch (IOException e) {
- }
- }
- }
-
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
+import org.apache.kahadb.replication.pb.PBType;
+
+public class ReplicationSlave implements Service, ClusterListener, TransportListener {
+ private static final Log LOG = LogFactory.getLog(ReplicationSlave.class);
+
+ private final ReplicationServer replicationServer;
+ private Transport transport;
+
+ public ReplicationSlave(ReplicationServer replicationServer) {
+ this.replicationServer = replicationServer;
+ }
+
+ public void start() throws Exception {
+ transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
+ transport.setTransportListener(this);
+ transport.start();
+
+ ReplicationFrame frame = new ReplicationFrame();
+ frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
+ PBSlaveInit payload = new PBSlaveInit();
+ payload.setNodeId(replicationServer.getNodeId());
+ frame.setPayload(payload);
+ LOG.info("Sending master slave init command: "+payload);
+ transport.oneway(frame);
+
+ }
+
+ public void stop() throws Exception {
+ }
+
+ public void onClusterChange(ClusterState config) {
+ }
+
+ public void onCommand(Object command) {
+ try {
+ ReplicationFrame frame = (ReplicationFrame) command;
+ switch (frame.getHeader().getType()) {
+ case SLAVE_INIT_RESPONSE:
+ onSlaveInitResponse(frame, (PBSlaveInitResponse) frame.getPayload());
+ break;
+ }
+ } catch (Exception e) {
+ failed(e);
+ }
+ }
+
+ public void onException(IOException error) {
+ failed(error);
+ }
+
+ public void failed(Exception error) {
+ try {
+ error.printStackTrace();
+ stop();
+ } catch (Exception ignore) {
+ }
+ }
+
+ public void transportInterupted() {
+ }
+
+ public void transportResumed() {
+ }
+
+ private Object transferMutex = new Object();
+ private LinkedList<PBFileInfo> transferQueue = new LinkedList<PBFileInfo>();
+
+ private void onSlaveInitResponse(ReplicationFrame frame, PBSlaveInitResponse response) throws Exception {
+ LOG.info("Got init response: "+response);
+ delete(response.getDeleteFilesList());
+ synchronized(transferMutex) {
+ transferQueue.clear();
+ transferQueue.addAll(response.getCopyFilesList());
+ }
+ addTransferSession();
+ }
+
+
+ private PBFileInfo dequeueTransferQueue() throws Exception {
+ synchronized( transferMutex ) {
+ if( transferQueue.isEmpty() ) {
+ return null;
+ }
+ return transferQueue.removeFirst();
+ }
+ }
+
+ LinkedList<TransferSession> transferSessions = new LinkedList<TransferSession>();
+
+ private void addTransferSession() throws Exception {
+ synchronized( transferMutex ) {
+ while( !transferQueue.isEmpty() && transferSessions.size()<5 ) {
+ TransferSession transferSession = new TransferSession();
+ transferSessions.add(transferSession);
+ try {
+ transferSession.start();
+ } catch (Exception e) {
+ transferSessions.remove(transferSession);
+ }
+ }
+ }
+ }
+
+ class TransferSession implements Service, TransportListener {
+
+ Transport transport;
+ private PBFileInfo info;
+ private File toFile;
+ private AtomicBoolean stopped = new AtomicBoolean();
+ private long transferStart;
+
+ public void start() throws Exception {
+ LOG.info("File transfer session started.");
+ transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
+ transport.setTransportListener(this);
+ transport.start();
+ sendNextRequestOrStop();
+ }
+
+ private void sendNextRequestOrStop() {
+ try {
+ PBFileInfo info = dequeueTransferQueue();
+ if( info !=null ) {
+
+ toFile = replicationServer.getReplicationFile(info.getName());
+ this.info = info;
+
+ ReplicationFrame frame = new ReplicationFrame();
+ frame.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER));
+ frame.setPayload(info);
+
+ LOG.info("Requesting file: "+info.getName());
+ transferStart = System.currentTimeMillis();
+
+ transport.oneway(frame);
+ } else {
+ stop();
+ }
+
+ } catch ( Exception e ) {
+ failed(e);
+ }
+ }
+
+ public void stop() throws Exception {
+ if( stopped.compareAndSet(false, true) ) {
+ LOG.info("File transfer session stopped.");
+ synchronized( transferMutex ) {
+ if( info!=null ) {
+ transferQueue.addLast(info);
+ }
+ info = null;
+ }
+ Thread stopThread = new Thread("Transfer Session Shutdown: "+transport.getRemoteAddress()) {
+ @Override
+ public void run() {
+ try {
+ transport.stop();
+ synchronized( transferMutex ) {
+ transferSessions.remove(this);
+ addTransferSession();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ stopThread.setDaemon(true);
+ stopThread.start();
+ }
+ }
+
+ public void onCommand(Object command) {
+ try {
+ ReplicationFrame frame = (ReplicationFrame) command;
+ InputStream is = (InputStream) frame.getPayload();
+ toFile.getParentFile().mkdirs();
+ FileOutputStream os = new FileOutputStream( toFile );
+ try {
+ copy(is, os, frame.getHeader().getPayloadSize());
+ long transferTime = System.currentTimeMillis()-this.transferStart;
+ float rate = frame.getHeader().getPayloadSize()*transferTime/1024000f;
+ LOG.info("File "+info.getName()+" transfered in "+transferTime+" (ms) at "+rate+" Kb/Sec");
+ } finally {
+ os.close();
+ }
+ this.info = null;
+ this.toFile = null;
+
+ sendNextRequestOrStop();
+ } catch (Exception e) {
+ failed(e);
+ }
+ }
+
+ public void onException(IOException error) {
+ failed(error);
+ }
+
+ public void failed(Exception error) {
+ try {
+ if( !stopped.get() ) {
+ LOG.warn("Replication session failure: "+transport.getRemoteAddress());
+ }
+ stop();
+ } catch (Exception ignore) {
+ }
+ }
+
+ public void transportInterupted() {
+ }
+ public void transportResumed() {
+ }
+
+ }
+
+ private void copy(InputStream is, OutputStream os, long length) throws IOException {
+ byte buffer[] = new byte[1024 * 4];
+ int c=0;
+ long pos=0;
+ while ( pos <length && ((c = is.read(buffer, 0, (int)Math.min(buffer.length, length-pos))) >= 0) ) {
+ os.write(buffer, 0, c);
+ pos+=c;
+ }
+ }
+
+ private void delete(List<String> files) {
+ for (String fn : files) {
+ try {
+ replicationServer.getReplicationFile(fn).delete();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+
+}
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java Fri Nov 7 11:00:25 2008
@@ -1,62 +1,62 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kahadb.replication;
-
-import java.util.ArrayList;
-
-public class StaticClusterStateManager implements ClusterStateManager {
-
- final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
- private ClusterState clusterState;
- private int startCounter;
-
- synchronized public ClusterState getClusterState() {
- return clusterState;
- }
-
- synchronized public void setClusterState(ClusterState clusterState) {
- this.clusterState = clusterState;
- fireClusterChange();
- }
-
- synchronized public void addListener(ClusterListener listener) {
- listeners.add(listener);
- fireClusterChange();
- }
-
- synchronized public void removeListener(ClusterListener listener) {
- listeners.remove(listener);
- }
-
- synchronized public void start() throws Exception {
- startCounter++;
- fireClusterChange();
- }
-
- synchronized private void fireClusterChange() {
- if( startCounter>0 && !listeners.isEmpty() && clusterState!=null ) {
- for (ClusterListener listener : listeners) {
- listener.onClusterChange(clusterState);
- }
- }
- }
-
- synchronized public void stop() throws Exception {
- startCounter--;
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.ArrayList;
+
+public class StaticClusterStateManager implements ClusterStateManager {
+
+ final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
+ private ClusterState clusterState;
+ private int startCounter;
+
+ synchronized public ClusterState getClusterState() {
+ return clusterState;
+ }
+
+ synchronized public void setClusterState(ClusterState clusterState) {
+ this.clusterState = clusterState;
+ fireClusterChange();
+ }
+
+ synchronized public void addListener(ClusterListener listener) {
+ listeners.add(listener);
+ fireClusterChange();
+ }
+
+ synchronized public void removeListener(ClusterListener listener) {
+ listeners.remove(listener);
+ }
+
+ synchronized public void start() throws Exception {
+ startCounter++;
+ fireClusterChange();
+ }
+
+ synchronized private void fireClusterChange() {
+ if( startCounter>0 && !listeners.isEmpty() && clusterState!=null ) {
+ for (ClusterListener listener : listeners) {
+ listener.onClusterChange(clusterState);
+ }
+ }
+ }
+
+ synchronized public void stop() throws Exception {
+ startCounter--;
+ }
+
+}
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java Fri Nov 7 11:00:25 2008
@@ -1,57 +1,57 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kahadb.replication.transport;
-
-import java.util.Map;
-
-import org.apache.activemq.transport.MutexTransport;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.tcp.TcpTransportFactory;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.wireformat.WireFormat;
-
-/**
- * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
- *
- * @version $Revision: 1.1.1.1 $
- */
-public class KDBRTransportFactory extends TcpTransportFactory {
-
- protected String getDefaultWireFormatType() {
- return "kdbr";
- }
-
- public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
- IntrospectionSupport.setProperties(transport, options);
- return super.compositeConfigure(transport, format, options);
- }
-
- protected boolean isUseInactivityMonitor(Transport transport) {
- return false;
- }
-
- /**
- * Override to remove the correlation transport filter since that relies on Command to
- * multiplex multiple requests and this protocol does not support that.
- */
- public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
- transport = compositeConfigure(transport, wf, options);
- transport = new MutexTransport(transport);
-
- return transport;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.util.Map;
+
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
+ *
+ * @version $Revision$
+ */
+public class KDBRTransportFactory extends TcpTransportFactory {
+
+ protected String getDefaultWireFormatType() {
+ return "kdbr";
+ }
+
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ IntrospectionSupport.setProperties(transport, options);
+ return super.compositeConfigure(transport, format, options);
+ }
+
+ protected boolean isUseInactivityMonitor(Transport transport) {
+ return false;
+ }
+
+ /**
+ * Override to remove the correlation transport filter since that relies on Command to
+ * multiplex multiple requests and this protocol does not support that.
+ */
+ public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
+ transport = compositeConfigure(transport, wf, options);
+ transport = new MutexTransport(transport);
+
+ return transport;
+ }
+}
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java Fri Nov 7 11:00:25 2008
@@ -1,126 +1,126 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kahadb.replication.transport;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.activemq.protobuf.Message;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.kahadb.replication.ReplicationFrame;
-import org.apache.kahadb.replication.pb.PBFileInfo;
-import org.apache.kahadb.replication.pb.PBHeader;
-import org.apache.kahadb.replication.pb.PBJournalLocation;
-import org.apache.kahadb.replication.pb.PBJournalUpdate;
-import org.apache.kahadb.replication.pb.PBSlaveInit;
-import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-
-public class KDBRWireFormat implements WireFormat {
-
- private int version;
-
- public int getVersion() {
- return version;
- }
-
- public void setVersion(int version) {
- this.version = version;
- }
-
- public ByteSequence marshal(Object command) throws IOException {
- throw new RuntimeException("Not implemented.");
- }
-
- public Object unmarshal(ByteSequence packet) throws IOException {
- throw new RuntimeException("Not implemented.");
- }
-
- public void marshal(Object command, DataOutput out) throws IOException {
- OutputStream os = (OutputStream) out;
- ReplicationFrame frame = (ReplicationFrame) command;
- PBHeader header = frame.getHeader();
- switch (frame.getHeader().getType()) {
- case FILE_TRANSFER_RESPONSE: {
- // Write the header..
- header.writeFramed(os);
- // Stream the Payload.
- InputStream is = (InputStream) frame.getPayload();
- byte data[] = new byte[1024 * 4];
- int c;
- long remaining = frame.getHeader().getPayloadSize();
- while (remaining > 0 && (c = is.read(data, 0, (int) Math.min(remaining, data.length))) >= 0) {
- os.write(data, 0, c);
- remaining -= c;
- }
- break;
- }
- default:
- if (frame.getPayload() == null) {
- header.clearPayloadSize();
- header.writeFramed(os);
- } else {
- // All other payloads types are PB messages
- Message message = (Message) frame.getPayload();
- header.setPayloadSize(message.serializedSizeUnframed());
- header.writeFramed(os);
- message.writeUnframed(os);
- }
- }
- }
-
- public Object unmarshal(DataInput in) throws IOException {
- InputStream is = (InputStream) in;
- ReplicationFrame frame = new ReplicationFrame();
- frame.setHeader(PBHeader.parseFramed(is));
- switch (frame.getHeader().getType()) {
- case FILE_TRANSFER_RESPONSE:
- frame.setPayload(is);
- break;
- case FILE_TRANSFER:
- readPBPayload(frame, in, new PBFileInfo());
- break;
- case JOURNAL_UPDATE:
- readPBPayload(frame, in, new PBJournalUpdate());
- break;
- case JOURNAL_UPDATE_ACK:
- readPBPayload(frame, in, new PBJournalLocation());
- break;
- case SLAVE_INIT:
- readPBPayload(frame, in, new PBSlaveInit());
- break;
- case SLAVE_INIT_RESPONSE:
- readPBPayload(frame, in, new PBSlaveInitResponse());
- break;
- }
- return frame;
- }
-
- private void readPBPayload(ReplicationFrame frame, DataInput in, Message pb) throws IOException, InvalidProtocolBufferException {
- long payloadSize = frame.getHeader().getPayloadSize();
- byte[] payload;
- payload = new byte[(int)payloadSize];
- in.readFully(payload);
- frame.setPayload(pb.mergeUnframed(payload));
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.activemq.protobuf.Message;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.replication.ReplicationFrame;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class KDBRWireFormat implements WireFormat {
+
+ private int version;
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public ByteSequence marshal(Object command) throws IOException {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public Object unmarshal(ByteSequence packet) throws IOException {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public void marshal(Object command, DataOutput out) throws IOException {
+ OutputStream os = (OutputStream) out;
+ ReplicationFrame frame = (ReplicationFrame) command;
+ PBHeader header = frame.getHeader();
+ switch (frame.getHeader().getType()) {
+ case FILE_TRANSFER_RESPONSE: {
+ // Write the header..
+ header.writeFramed(os);
+ // Stream the Payload.
+ InputStream is = (InputStream) frame.getPayload();
+ byte data[] = new byte[1024 * 4];
+ int c;
+ long remaining = frame.getHeader().getPayloadSize();
+ while (remaining > 0 && (c = is.read(data, 0, (int) Math.min(remaining, data.length))) >= 0) {
+ os.write(data, 0, c);
+ remaining -= c;
+ }
+ break;
+ }
+ default:
+ if (frame.getPayload() == null) {
+ header.clearPayloadSize();
+ header.writeFramed(os);
+ } else {
+ // All other payloads types are PB messages
+ Message message = (Message) frame.getPayload();
+ header.setPayloadSize(message.serializedSizeUnframed());
+ header.writeFramed(os);
+ message.writeUnframed(os);
+ }
+ }
+ }
+
+ public Object unmarshal(DataInput in) throws IOException {
+ InputStream is = (InputStream) in;
+ ReplicationFrame frame = new ReplicationFrame();
+ frame.setHeader(PBHeader.parseFramed(is));
+ switch (frame.getHeader().getType()) {
+ case FILE_TRANSFER_RESPONSE:
+ frame.setPayload(is);
+ break;
+ case FILE_TRANSFER:
+ readPBPayload(frame, in, new PBFileInfo());
+ break;
+ case JOURNAL_UPDATE:
+ readPBPayload(frame, in, new PBJournalUpdate());
+ break;
+ case JOURNAL_UPDATE_ACK:
+ readPBPayload(frame, in, new PBJournalLocation());
+ break;
+ case SLAVE_INIT:
+ readPBPayload(frame, in, new PBSlaveInit());
+ break;
+ case SLAVE_INIT_RESPONSE:
+ readPBPayload(frame, in, new PBSlaveInitResponse());
+ break;
+ }
+ return frame;
+ }
+
+ private void readPBPayload(ReplicationFrame frame, DataInput in, Message pb) throws IOException, InvalidProtocolBufferException {
+ long payloadSize = frame.getHeader().getPayloadSize();
+ byte[] payload;
+ payload = new byte[(int)payloadSize];
+ in.readFully(payload);
+ frame.setPayload(pb.mergeUnframed(payload));
+ }
+
+}
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java Fri Nov 7 11:00:25 2008
@@ -1,30 +1,30 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kahadb.replication.transport;
-
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.wireformat.WireFormatFactory;
-
-/**
- * @version $Revision$
- */
-public class KDBRWireFormatFactory implements WireFormatFactory {
-
- public WireFormat createWireFormat() {
- return new KDBRWireFormat();
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * @version $Revision$
+ */
+public class KDBRWireFormatFactory implements WireFormatFactory {
+
+ public WireFormat createWireFormat() {
+ return new KDBRWireFormat();
+ }
+}
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/JournalCommand.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/JournalCommand.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteArrayInputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteArrayInputStream.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteArrayOutputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteArrayOutputStream.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/CommandLineSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/CommandLineSupport.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/CommandLineSupport.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/CommandLineSupport.java Fri Nov 7 11:00:25 2008
@@ -1,115 +1,115 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kahadb.util;
-
-import java.util.ArrayList;
-
-/**
- * Support utility that can be used to set the properties on any object
- * using command line arguments.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class CommandLineSupport {
-
- /**
- * Sets the properties of an object given the command line args.
- *
- * if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent
- *
- * then it will try to call the following setters on the target object.
- *
- * target.setAckMode("AUTO");
- * target.setURL(new URI("tcp://localhost:61616") );
- * target.setPersistent(true);
- *
- * Notice the the proper conversion for the argument is determined by examining the
- * setter argument type.
- *
- * @param target the object that will have it's properties set
- * @param args the command line options
- * @return any arguments that are not valid options for the target
- */
- static public String[] setOptions(Object target, String []args) {
- ArrayList rc = new ArrayList();
-
- for (int i = 0; i < args.length; i++) {
- if( args[i] == null )
- continue;
-
- if( args[i].startsWith("--") ) {
-
- // --options without a specified value are considered boolean flags that are enabled.
- String value="true";
- String name = args[i].substring(2);
-
- // if --option=value case
- int p = name.indexOf("=");
- if( p > 0 ) {
- value = name.substring(p+1);
- name = name.substring(0,p);
- }
-
- // name not set, then it's an unrecognized option
- if( name.length()==0 ) {
- rc.add(args[i]);
- continue;
- }
-
- String propName = convertOptionToPropertyName(name);
- if( !IntrospectionSupport.setProperty(target, propName, value) ) {
- rc.add(args[i]);
- continue;
- }
- } else {
- rc.add(args[i]);
- }
-
- }
-
- String r[] = new String[rc.size()];
- rc.toArray(r);
- return r;
- }
-
- /**
- * converts strings like: test-enabled to testEnabled
- * @param name
- * @return
- */
- private static String convertOptionToPropertyName(String name) {
- String rc="";
-
- // Look for '-' and strip and then convert the subsequent char to uppercase
- int p = name.indexOf("-");
- while( p > 0 ) {
- // strip
- rc += name.substring(0, p);
- name = name.substring(p+1);
-
- // can I convert the next char to upper?
- if( name.length() >0 ) {
- rc += name.substring(0,1).toUpperCase();
- name = name.substring(1);
- }
-
- p = name.indexOf("-");
- }
- return rc+name;
- }
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.util;
+
+import java.util.ArrayList;
+
+/**
+ * Support utility that can be used to set the properties on any object
+ * using command line arguments.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class CommandLineSupport {
+
+ /**
+ * Sets the properties of an object given the command line args.
+ *
+ * if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent
+ *
+ * then it will try to call the following setters on the target object.
+ *
+ * target.setAckMode("AUTO");
+ * target.setURL(new URI("tcp://localhost:61616") );
+ * target.setPersistent(true);
+ *
+ * Notice the the proper conversion for the argument is determined by examining the
+ * setter argument type.
+ *
+ * @param target the object that will have it's properties set
+ * @param args the command line options
+ * @return any arguments that are not valid options for the target
+ */
+ static public String[] setOptions(Object target, String []args) {
+ ArrayList rc = new ArrayList();
+
+ for (int i = 0; i < args.length; i++) {
+ if( args[i] == null )
+ continue;
+
+ if( args[i].startsWith("--") ) {
+
+ // --options without a specified value are considered boolean flags that are enabled.
+ String value="true";
+ String name = args[i].substring(2);
+
+ // if --option=value case
+ int p = name.indexOf("=");
+ if( p > 0 ) {
+ value = name.substring(p+1);
+ name = name.substring(0,p);
+ }
+
+ // name not set, then it's an unrecognized option
+ if( name.length()==0 ) {
+ rc.add(args[i]);
+ continue;
+ }
+
+ String propName = convertOptionToPropertyName(name);
+ if( !IntrospectionSupport.setProperty(target, propName, value) ) {
+ rc.add(args[i]);
+ continue;
+ }
+ } else {
+ rc.add(args[i]);
+ }
+
+ }
+
+ String r[] = new String[rc.size()];
+ rc.toArray(r);
+ return r;
+ }
+
+ /**
+ * converts strings like: test-enabled to testEnabled
+ * @param name
+ * @return
+ */
+ private static String convertOptionToPropertyName(String name) {
+ String rc="";
+
+ // Look for '-' and strip and then convert the subsequent char to uppercase
+ int p = name.indexOf("-");
+ while( p > 0 ) {
+ // strip
+ rc += name.substring(0, p);
+ name = name.substring(p+1);
+
+ // can I convert the next char to upper?
+ if( name.length() >0 ) {
+ rc += name.substring(0,1).toUpperCase();
+ name = name.substring(1);
+ }
+
+ p = name.indexOf("-");
+ }
+ return rc+name;
+ }
+}
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/CommandLineSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/CommandLineSupport.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayInputStream.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayInputStream.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayInputStream.java Fri Nov 7 11:00:25 2008
@@ -24,7 +24,7 @@
/**
* Optimized ByteArrayInputStream that can be used more than once
*
- * @version $Revision: 1.1.1.1 $
+ * @version $Revision$
*/
public final class DataByteArrayInputStream extends InputStream implements DataInput {
private byte[] buf;
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayInputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayInputStream.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayOutputStream.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayOutputStream.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayOutputStream.java Fri Nov 7 11:00:25 2008
@@ -24,7 +24,7 @@
/**
* Optimized ByteArrayOutputStream
*
- * @version $Revision: 1.1.1.1 $
+ * @version $Revision$
*/
public class DataByteArrayOutputStream extends OutputStream implements DataOutput {
private static final int DEFAULT_SIZE = 2048;
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayOutputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayOutputStream.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DiskBenchmark.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DiskBenchmark.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/HexSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/HexSupport.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/HexSupport.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/HexSupport.java Fri Nov 7 11:00:25 2008
@@ -19,7 +19,7 @@
/**
* Used to convert to hex from byte arrays and back.
*
- * @version $Revision: 1.2 $
+ * @version $Revision$
*/
public final class HexSupport {
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/HexSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/HexSupport.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IOExceptionSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IOExceptionSupport.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java Fri Nov 7 11:00:25 2008
@@ -25,7 +25,7 @@
import java.io.OutputStream;
/**
- * @version $Revision: 661435 $
+ * @version $Revision$
*/
public final class IOHelper {
protected static final int MAX_DIR_NAME_LENGTH;
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IntrospectionSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IntrospectionSupport.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LRUCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LRUCache.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNode.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNode.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/Scheduler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/Scheduler.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SchedulerTimerTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SchedulerTimerTask.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/main/proto/journal-data.proto
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto (original)
+++ activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto Fri Nov 7 11:00:25 2008
@@ -1,99 +1,99 @@
-//
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-//
-package org.apache.kahadb.replication.pb;
-
-option java_multiple_files = true;
-option java_outer_classname = "PB";
-
-//
-//
-//
-message PBHeader {
- required PBType type=1;
- optional int64 payload_size=2;
-}
-
-enum PBType {
-
- // Sent from the slave to the master when the slave first starts. It lets the master
- // know about the slave's synchronization state. This allows the master decide how to best synchronize
- // the slave.
- //
- // @followed-by PBSlaveInit
- SLAVE_INIT = 0;
-
- // The Master will send this response back to the slave, letting it know what it needs to do to get
- // it's data files synchronized with the master.
- //
- // @followed-by PBSlaveInitResponse
- SLAVE_INIT_RESPONSE = 1;
-
- // Sent from the Master to the slave to replicate a Journal update.
- //
- // @followed-by PBJournalUpdate
- JOURNAL_UPDATE=3;
-
- // An ack sent from the Slave to a master to let the master know up to where in the journal the slave has
- // synchronized to. This acknowledges receipt of all previous journal records. This should not be sent until
- // all bulk file copies are complete.
- //
- // @followed-by PBJournalLocation
- JOURNAL_UPDATE_ACK=4;
-
- // A Request for a bulk file transfer. Sent from a slave to a Master
- //
- // @followed-by PBFileInfo
- FILE_TRANSFER=5;
-
- // A bulk file transfer response
- //
- // @followed-by the bytes of the requested file.
- FILE_TRANSFER_RESPONSE=6;
-}
-
-message PBFileInfo {
- required string name=1;
- optional int32 snapshot_id=2;
- optional sfixed64 checksum=3;
- optional int64 start=4;
- optional int64 end=5;
-}
-
-message PBJournalLocation {
- required int32 file_id=1;
- required int32 offset=2;
-}
-
-message PBSlaveInit {
- // The id of the slave node that is being initialized
- required string node_id=1;
- // The files that the slave node currently has
- repeated PBFileInfo current_files=2;
-}
-
-message PBSlaveInitResponse {
- // The files that the slave should bulk copy from the master..
- repeated PBFileInfo copy_files=1;
- // The files that the slave should delete
- repeated string delete_files=2;
-}
-
-message PBJournalUpdate {
- required PBJournalLocation location=1;
- required bytes data=2;
-}
-
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.kahadb.replication.pb;
+
+option java_multiple_files = true;
+option java_outer_classname = "PB";
+
+//
+//
+//
+message PBHeader {
+ required PBType type=1;
+ optional int64 payload_size=2;
+}
+
+enum PBType {
+
+ // Sent from the slave to the master when the slave first starts. It lets the master
+ // know about the slave's synchronization state. This allows the master decide how to best synchronize
+ // the slave.
+ //
+ // @followed-by PBSlaveInit
+ SLAVE_INIT = 0;
+
+ // The Master will send this response back to the slave, letting it know what it needs to do to get
+ // it's data files synchronized with the master.
+ //
+ // @followed-by PBSlaveInitResponse
+ SLAVE_INIT_RESPONSE = 1;
+
+ // Sent from the Master to the slave to replicate a Journal update.
+ //
+ // @followed-by PBJournalUpdate
+ JOURNAL_UPDATE=3;
+
+ // An ack sent from the Slave to a master to let the master know up to where in the journal the slave has
+ // synchronized to. This acknowledges receipt of all previous journal records. This should not be sent until
+ // all bulk file copies are complete.
+ //
+ // @followed-by PBJournalLocation
+ JOURNAL_UPDATE_ACK=4;
+
+ // A Request for a bulk file transfer. Sent from a slave to a Master
+ //
+ // @followed-by PBFileInfo
+ FILE_TRANSFER=5;
+
+ // A bulk file transfer response
+ //
+ // @followed-by the bytes of the requested file.
+ FILE_TRANSFER_RESPONSE=6;
+}
+
+message PBFileInfo {
+ required string name=1;
+ optional int32 snapshot_id=2;
+ optional sfixed64 checksum=3;
+ optional int64 start=4;
+ optional int64 end=5;
+}
+
+message PBJournalLocation {
+ required int32 file_id=1;
+ required int32 offset=2;
+}
+
+message PBSlaveInit {
+ // The id of the slave node that is being initialized
+ required string node_id=1;
+ // The files that the slave node currently has
+ repeated PBFileInfo current_files=2;
+}
+
+message PBSlaveInitResponse {
+ // The files that the slave should bulk copy from the master..
+ repeated PBFileInfo copy_files=1;
+ // The files that the slave should delete
+ repeated string delete_files=2;
+}
+
+message PBJournalUpdate {
+ required PBJournalLocation location=1;
+ required bytes data=2;
+}
+
Propchange: activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr (original)
+++ activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr Fri Nov 7 11:00:25 2008
@@ -1,17 +1,17 @@
-## ---------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements. See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License. You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ---------------------------------------------------------------------------
-class=org.apache.kahadb.replication.transport.KDBRTransportFactory
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.kahadb.replication.transport.KDBRTransportFactory
Propchange: activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr (original)
+++ activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr Fri Nov 7 11:00:25 2008
@@ -1,17 +1,17 @@
-## ---------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements. See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License. You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ---------------------------------------------------------------------------
-class=org.apache.kahadb.replication.transport.KDBRWireFormatFactory
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.kahadb.replication.transport.KDBRWireFormatFactory
Propchange: activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/eclipse-resources/log4j.properties
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexBenchMark.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexBenchMark.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexBenchMark.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexBenchMark.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/NioJournalTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/NioJournalTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=712224&r1=712223&r2=712224&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Fri Nov 7 11:00:25 2008
@@ -1,110 +1,110 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kahadb.replication;
-
-import java.util.Arrays;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQQueue;
-
-public class ReplicationTest extends TestCase {
-
-
- private static final String BROKER1_URI = "tcp://localhost:61001";
- private static final String BROKER2_URI = "tcp://localhost:61002";
-
- private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001";
- private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
-
- private Destination destination = new ActiveMQQueue("TEST_QUEUE");
-
- public void testReplication() throws Exception {
-
- // This cluster object will control who becomes the master.
- StaticClusterStateManager cluster = new StaticClusterStateManager();
-
- ReplicatedBrokerService b1 = new ReplicatedBrokerService();
- b1.addConnector(BROKER1_URI);
- b1.setDataDirectory("target/replication-test/broker1");
- b1.setBrokerName("broker1");
- b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
- b1.getReplicationServer().setCluster(cluster);
- b1.start();
-
- ReplicatedBrokerService b2 = new ReplicatedBrokerService();
- b2.addConnector(BROKER2_URI);
- b2.setDataDirectory("target/replication-test/broker2");
- b2.setBrokerName("broker2");
- b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
- b2.getReplicationServer().setCluster(cluster);
- b2.start();
-
-// // None of the brokers should be accepting connections since they are not masters.
-// try {
-// sendMesagesTo(1, BROKER1_URI);
-// fail("Connection failure expected.");
-// } catch( JMSException e ) {
-// }
-
- // Make b1 the master.
- ClusterState clusterState = new ClusterState();
- clusterState.setMaster(BROKER1_REPLICATION_ID);
- cluster.setClusterState(clusterState);
-
- try {
- sendMesagesTo(500, BROKER1_URI);
- } catch( JMSException e ) {
- fail("b1 did not become a master.");
- }
-
- // Make broker 2 a salve.
- clusterState = new ClusterState();
- clusterState.setMaster(BROKER1_REPLICATION_ID);
- String[] slaves = {BROKER2_REPLICATION_ID};
- clusterState.setSlaves(Arrays.asList(slaves));
- cluster.setClusterState(clusterState);
-
- Thread.sleep(10000);
-
- b2.stop();
- b1.stop();
-
- }
-
- private void sendMesagesTo(int count, String brokerUri) throws JMSException {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
- Connection con = cf.createConnection();
- try {
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < count; i++) {
- producer.send(session.createTextMessage("Hello: "+i));
- }
- } finally {
- try { con.close(); } catch (Throwable e) {}
- }
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.Arrays;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class ReplicationTest extends TestCase {
+
+
+ private static final String BROKER1_URI = "tcp://localhost:61001";
+ private static final String BROKER2_URI = "tcp://localhost:61002";
+
+ private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001";
+ private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
+
+ private Destination destination = new ActiveMQQueue("TEST_QUEUE");
+
+ public void testReplication() throws Exception {
+
+ // This cluster object will control who becomes the master.
+ StaticClusterStateManager cluster = new StaticClusterStateManager();
+
+ ReplicatedBrokerService b1 = new ReplicatedBrokerService();
+ b1.addConnector(BROKER1_URI);
+ b1.setDataDirectory("target/replication-test/broker1");
+ b1.setBrokerName("broker1");
+ b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
+ b1.getReplicationServer().setCluster(cluster);
+ b1.start();
+
+ ReplicatedBrokerService b2 = new ReplicatedBrokerService();
+ b2.addConnector(BROKER2_URI);
+ b2.setDataDirectory("target/replication-test/broker2");
+ b2.setBrokerName("broker2");
+ b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
+ b2.getReplicationServer().setCluster(cluster);
+ b2.start();
+
+// // None of the brokers should be accepting connections since they are not masters.
+// try {
+// sendMesagesTo(1, BROKER1_URI);
+// fail("Connection failure expected.");
+// } catch( JMSException e ) {
+// }
+
+ // Make b1 the master.
+ ClusterState clusterState = new ClusterState();
+ clusterState.setMaster(BROKER1_REPLICATION_ID);
+ cluster.setClusterState(clusterState);
+
+ try {
+ sendMesagesTo(500, BROKER1_URI);
+ } catch( JMSException e ) {
+ fail("b1 did not become a master.");
+ }
+
+ // Make broker 2 a salve.
+ clusterState = new ClusterState();
+ clusterState.setMaster(BROKER1_REPLICATION_ID);
+ String[] slaves = {BROKER2_REPLICATION_ID};
+ clusterState.setSlaves(Arrays.asList(slaves));
+ cluster.setClusterState(clusterState);
+
+ Thread.sleep(10000);
+
+ b2.stop();
+ b1.stop();
+
+ }
+
+ private void sendMesagesTo(int count, String brokerUri) throws JMSException {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+ Connection con = cf.createConnection();
+ try {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < count; i++) {
+ producer.send(session.createTextMessage("Hello: "+i));
+ }
+ } finally {
+ try { con.close(); } catch (Throwable e) {}
+ }
+ }
+
+}
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision