You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/18 20:53:51 UTC
svn commit: r755716 [2/2] -
/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java?rev=755716&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java Wed Mar 18 19:53:51 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.store.kahadb.data.KahaAddMessageCommand;
+import org.apache.activemq.broker.store.kahadb.data.KahaRemoveMessageCommand;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Transaction;
+
+public abstract class Operation {
+
+ protected final KahaDBStore store;
+ final Location location;
+
+ public Operation(KahaDBStore store, Location location) {
+ this.store = store;
+ this.location = location;
+ }
+
+ public Location getLocation() {
+ return location;
+ }
+
+ abstract public void execute(Transaction tx) throws IOException;
+
+
+ public static class AddOpperation extends Operation {
+ final KahaAddMessageCommand command;
+
+ public AddOpperation(KahaDBStore store, KahaAddMessageCommand command, Location location) {
+ super(store, location);
+ this.command = command;
+ }
+
+ public void execute(Transaction tx) throws IOException {
+ store.upadateIndex(tx, command, location);
+ }
+
+ public KahaAddMessageCommand getCommand() {
+ return command;
+ }
+ }
+
+ public static class RemoveOpperation extends Operation {
+
+ final KahaRemoveMessageCommand command;
+
+ public RemoveOpperation(KahaDBStore store, KahaRemoveMessageCommand command, Location location) {
+ super(store, location);
+ this.command = command;
+ }
+
+ public void execute(Transaction tx) throws IOException {
+ store.updateIndex(tx, command, location);
+ }
+
+ public KahaRemoveMessageCommand getCommand() {
+ return command;
+ }
+ }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java?rev=755716&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java Wed Mar 18 19:53:51 2009
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.util.Marshaller;
+
+public class StoredDBState {
+
+ protected final KahaDBStore store;
+ protected Page<StoredDBState> page;
+ protected int state;
+ protected BTreeIndex<String, StoredDestinationState> destinations;
+ protected Location lastUpdate;
+ protected Location firstInProgressTransactionLocation;
+
+ public StoredDBState(KahaDBStore store) {
+ this.store = store;
+ }
+
+
+ public void read(DataInput is) throws IOException {
+ state = is.readInt();
+ destinations = new BTreeIndex<String, StoredDestinationState>(store.pageFile, is.readLong());
+ if (is.readBoolean()) {
+ lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
+ } else {
+ lastUpdate = null;
+ }
+ if (is.readBoolean()) {
+ firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
+ } else {
+ firstInProgressTransactionLocation = null;
+ }
+ }
+
+ public void write(DataOutput os) throws IOException {
+ os.writeInt(state);
+ os.writeLong(destinations.getPageId());
+
+ if (lastUpdate != null) {
+ os.writeBoolean(true);
+ LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
+ } else {
+ os.writeBoolean(false);
+ }
+
+ if (firstInProgressTransactionLocation != null) {
+ os.writeBoolean(true);
+ LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
+ } else {
+ os.writeBoolean(false);
+ }
+ }
+
+ static public class DBStateMarshaller implements Marshaller<StoredDBState> {
+ private final KahaDBStore store;
+
+ public DBStateMarshaller(KahaDBStore store) {
+ this.store = store;
+ }
+
+ public Class<StoredDBState> getType() {
+ return StoredDBState.class;
+ }
+
+ public StoredDBState readPayload(DataInput dataIn) throws IOException {
+ StoredDBState rc = new StoredDBState(this.store);
+ rc.read(dataIn);
+ return rc;
+ }
+
+ public void writePayload(StoredDBState object, DataOutput dataOut) throws IOException {
+ object.write(dataOut);
+ }
+ }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java?rev=755716&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java Wed Mar 18 19:53:51 2009
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.TreeMap;
+
+import org.apache.activemq.broker.store.kahadb.data.KahaSubscriptionCommand;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.util.Marshaller;
+
+public class StoredDestinationState {
+ long nextMessageId;
+ BTreeIndex<Long, MessageKeys> orderIndex;
+ BTreeIndex<Location, Long> locationIndex;
+ BTreeIndex<String, Long> messageIdIndex;
+
+ // These bits are only set for Topics
+ BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
+ BTreeIndex<String, Long> subscriptionAcks;
+ HashMap<String, Long> subscriptionCursors;
+ TreeMap<Long, HashSet<String>> ackPositions;
+
+ public static class StoredDestinationMarshaller implements Marshaller<StoredDestinationState> {
+ private final KahaDBStore store;
+
+ public StoredDestinationMarshaller(KahaDBStore store) {
+ this.store = store;
+ }
+
+ public Class<StoredDestinationState> getType() {
+ return StoredDestinationState.class;
+ }
+
+ public StoredDestinationState readPayload(DataInput dataIn) throws IOException {
+ StoredDestinationState value = new StoredDestinationState();
+ value.orderIndex = new BTreeIndex<Long, MessageKeys>(store.pageFile, dataIn.readLong());
+ value.locationIndex = new BTreeIndex<Location, Long>(store.pageFile, dataIn.readLong());
+ value.messageIdIndex = new BTreeIndex<String, Long>(store.pageFile, dataIn.readLong());
+
+ if (dataIn.readBoolean()) {
+ value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(store.pageFile, dataIn.readLong());
+ value.subscriptionAcks = new BTreeIndex<String, Long>(store.pageFile, dataIn.readLong());
+ }
+ return value;
+ }
+
+ public void writePayload(StoredDestinationState value, DataOutput dataOut) throws IOException {
+ dataOut.writeLong(value.orderIndex.getPageId());
+ dataOut.writeLong(value.locationIndex.getPageId());
+ dataOut.writeLong(value.messageIdIndex.getPageId());
+ if (value.subscriptions != null) {
+ dataOut.writeBoolean(true);
+ dataOut.writeLong(value.subscriptions.getPageId());
+ dataOut.writeLong(value.subscriptionAcks.getPageId());
+ } else {
+ dataOut.writeBoolean(false);
+ }
+ }
+ }
+}
\ No newline at end of file