You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/18 20:53:51 UTC

svn commit: r755716 [2/2] - /activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java?rev=755716&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java Wed Mar 18 19:53:51 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.store.kahadb.data.KahaAddMessageCommand;
+import org.apache.activemq.broker.store.kahadb.data.KahaRemoveMessageCommand;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Transaction;
+
+public abstract class Operation {
+    
+    protected final KahaDBStore store;
+    final Location location;
+
+    public Operation(KahaDBStore store, Location location) {
+        this.store = store;
+        this.location = location;
+    }
+
+    public Location getLocation() {
+        return location;
+    }
+
+    abstract public void execute(Transaction tx) throws IOException;
+    
+    
+    public static class AddOpperation extends Operation {
+        final KahaAddMessageCommand command;
+
+        public AddOpperation(KahaDBStore store, KahaAddMessageCommand command, Location location) {
+            super(store, location);
+            this.command = command;
+        }
+
+        public void execute(Transaction tx) throws IOException {
+            store.upadateIndex(tx, command, location);
+        }
+
+        public KahaAddMessageCommand getCommand() {
+            return command;
+        }
+    }
+    
+    public static class RemoveOpperation extends Operation {
+
+        final KahaRemoveMessageCommand command;
+
+        public RemoveOpperation(KahaDBStore store, KahaRemoveMessageCommand command, Location location) {
+            super(store, location);
+            this.command = command;
+        }
+
+        public void execute(Transaction tx) throws IOException {
+            store.updateIndex(tx, command, location);
+        }
+
+        public KahaRemoveMessageCommand getCommand() {
+            return command;
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java?rev=755716&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java Wed Mar 18 19:53:51 2009
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.util.Marshaller;
+
+public class StoredDBState {
+    
+    protected final KahaDBStore store;
+    protected Page<StoredDBState> page;
+    protected int state;
+    protected BTreeIndex<String, StoredDestinationState> destinations;
+    protected Location lastUpdate;
+    protected Location firstInProgressTransactionLocation;
+
+    public StoredDBState(KahaDBStore store) {
+        this.store = store;
+    }
+
+
+    public void read(DataInput is) throws IOException {
+        state = is.readInt();
+        destinations = new BTreeIndex<String, StoredDestinationState>(store.pageFile, is.readLong());
+        if (is.readBoolean()) {
+            lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
+        } else {
+            lastUpdate = null;
+        }
+        if (is.readBoolean()) {
+            firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
+        } else {
+            firstInProgressTransactionLocation = null;
+        }
+    }
+
+    public void write(DataOutput os) throws IOException {
+        os.writeInt(state);
+        os.writeLong(destinations.getPageId());
+
+        if (lastUpdate != null) {
+            os.writeBoolean(true);
+            LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
+        } else {
+            os.writeBoolean(false);
+        }
+
+        if (firstInProgressTransactionLocation != null) {
+            os.writeBoolean(true);
+            LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
+        } else {
+            os.writeBoolean(false);
+        }
+    }
+    
+    static public class DBStateMarshaller implements Marshaller<StoredDBState> {
+        private final KahaDBStore store;
+
+        public DBStateMarshaller(KahaDBStore store) {
+            this.store = store;
+        }
+
+        public Class<StoredDBState> getType() {
+            return StoredDBState.class;
+        }
+
+        public StoredDBState readPayload(DataInput dataIn) throws IOException {
+            StoredDBState rc = new StoredDBState(this.store);
+            rc.read(dataIn);
+            return rc;
+        }
+
+        public void writePayload(StoredDBState object, DataOutput dataOut) throws IOException {
+            object.write(dataOut);
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java?rev=755716&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java Wed Mar 18 19:53:51 2009
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.TreeMap;
+
+import org.apache.activemq.broker.store.kahadb.data.KahaSubscriptionCommand;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.util.Marshaller;
+
+public class StoredDestinationState {
+    long nextMessageId;
+    BTreeIndex<Long, MessageKeys> orderIndex;
+    BTreeIndex<Location, Long> locationIndex;
+    BTreeIndex<String, Long> messageIdIndex;
+
+    // These bits are only set for Topics
+    BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
+    BTreeIndex<String, Long> subscriptionAcks;
+    HashMap<String, Long> subscriptionCursors;
+    TreeMap<Long, HashSet<String>> ackPositions;
+    
+    public static class StoredDestinationMarshaller implements Marshaller<StoredDestinationState> {
+        private final KahaDBStore store;
+
+        public StoredDestinationMarshaller(KahaDBStore store) {
+            this.store = store;
+        }
+
+        public Class<StoredDestinationState> getType() {
+            return StoredDestinationState.class;
+        }
+
+        public StoredDestinationState readPayload(DataInput dataIn) throws IOException {
+            StoredDestinationState value = new StoredDestinationState();
+            value.orderIndex = new BTreeIndex<Long, MessageKeys>(store.pageFile, dataIn.readLong());
+            value.locationIndex = new BTreeIndex<Location, Long>(store.pageFile, dataIn.readLong());
+            value.messageIdIndex = new BTreeIndex<String, Long>(store.pageFile, dataIn.readLong());
+
+            if (dataIn.readBoolean()) {
+                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(store.pageFile, dataIn.readLong());
+                value.subscriptionAcks = new BTreeIndex<String, Long>(store.pageFile, dataIn.readLong());
+            }
+            return value;
+        }
+
+        public void writePayload(StoredDestinationState value, DataOutput dataOut) throws IOException {
+            dataOut.writeLong(value.orderIndex.getPageId());
+            dataOut.writeLong(value.locationIndex.getPageId());
+            dataOut.writeLong(value.messageIdIndex.getPageId());
+            if (value.subscriptions != null) {
+                dataOut.writeBoolean(true);
+                dataOut.writeLong(value.subscriptions.getPageId());
+                dataOut.writeLong(value.subscriptionAcks.getPageId());
+            } else {
+                dataOut.writeBoolean(false);
+            }
+        }
+    }
+}
\ No newline at end of file