You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/20 17:15:44 UTC
svn commit: r756565 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
main/java/org/apache/activemq/broker/stomp/ main/java/org/apache/activemq/...
Author: chirino
Date: Fri Mar 20 16:15:43 2009
New Revision: 756565
URL: http://svn.apache.org/viewvc?rev=756565&view=rev
Log:
More store work
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java Fri Mar 20 16:15:43 2009
@@ -18,6 +18,7 @@
this.flow = flow;
}
+ @Override
public void reserve(E elem) {
super.reserve(elem);
// if (!clientMode) {
@@ -25,14 +26,16 @@
// }
}
+ /*
public void releaseReserved(E elem) {
super.reserve(elem);
// if (!clientMode) {
// System.out.println(name + " Released Reserved " + this);
// }
- }
-
- protected void remove(int size) {
+ }*/
+
+ @Override
+ public void remove(long size) {
super.remove(size);
if (!clientMode) {
available += size;
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Fri Mar 20 16:15:43 2009
@@ -18,6 +18,7 @@
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.protobuf.AsciiBuffer;
public interface DeliveryTarget {
@@ -25,4 +26,7 @@
public boolean match(MessageDelivery message);
+ public boolean isDurable();
+
+ public AsciiBuffer getPersistentQueueName();
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Fri Mar 20 16:15:43 2009
@@ -18,6 +18,7 @@
import org.apache.activemq.broker.Destination;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
public interface MessageDelivery {
@@ -37,5 +38,18 @@
public <T> T asType(Class<T> type);
public boolean isPersistent();
-
+
+ /**
+ * Assigns a tracking number to this MessageDelivery. Tracking numbers
+ * are assigned sequentially and are unique within the broker.
+ */
+ public void setTrackingNumber(long tracking);
+
+ public long getTrackingNumber();
+
+ /**
+ * Returns the message's buffer representation.
+ * @return
+ */
+ public Buffer getMessageBuffer();
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Fri Mar 20 16:15:43 2009
@@ -172,4 +172,13 @@
this.destination = destination;
}
+ public AsciiBuffer getPersistentQueueName() {
+ // TODO Auto-generated method stub
+ return destination.getName();
+ }
+
+ public boolean isDurable() {
+ return true;
+ }
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Fri Mar 20 16:15:43 2009
@@ -20,6 +20,7 @@
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.command.Message;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
public class OpenWireMessageDelivery implements MessageDelivery {
@@ -27,6 +28,7 @@
private Destination destination;
private AsciiBuffer producerId;
private Runnable completionCallback;
+ private long tracking;
public OpenWireMessageDelivery(Message message) {
this.message = message;
@@ -85,4 +87,21 @@
return message.isPersistent();
}
+ public void setTrackingNumber(long tracking) {
+ this.tracking = tracking;
+ }
+
+ public long getTrackingNumber() {
+ return tracking;
+ }
+
+ /**
+ * Returns the message's buffer representation.
+ *
+ * @return
+ */
+ public Buffer getMessageBuffer() {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Mar 20 16:15:43 2009
@@ -89,20 +89,20 @@
protected final Object inboundMutex = new Object();
protected IFlowController<MessageDelivery> inboundController;
-
+
protected BrokerConnection connection;
private OpenWireFormat wireFormat;
- private Router router;
-
+ private Router router;
+
public void start() throws Exception {
// Setup the inbound processing..
- final Flow flow = new Flow("broker-"+connection.getName()+"-inbound", false);
+ final Flow flow = new Flow("broker-" + connection.getName() + "-inbound", false);
SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
route(controller, elem);
}
-
+
public String toString() {
return flow.getFlowName();
}
@@ -113,7 +113,7 @@
}
public void onCommand(Object o) {
-
+
final Command command = (Command) o;
boolean responseRequired = command.isResponseRequired();
try {
@@ -198,13 +198,14 @@
// Control Methods
// /////////////////////////////////////////////////////////////////
public Response processWireFormat(WireFormatInfo info) throws Exception {
-
+
// Negotiate the openwire encoding options.
WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1);
wfn.sendWireFormat();
wfn.negociate(info);
-
- // Now that the encoding is negotiated.. let the client know the details about this
+
+ // Now that the encoding is negotiated.. let the client know
+ // the details about this
// broker.
BrokerInfo brokerInfo = new BrokerInfo();
brokerInfo.setBrokerId(new BrokerId(connection.getBroker().getName()));
@@ -294,8 +295,8 @@
// /////////////////////////////////////////////////////////////////
// Methods for cluster operations
- // These commands are sent to the broker when it's acting like a
- // client to another broker.
+ // These commands are sent to the broker when it's acting like a
+ // client to another broker.
// /////////////////////////////////////////////////////////////////
public Response processBrokerInfo(BrokerInfo info) throws Exception {
throw new UnsupportedOperationException();
@@ -308,11 +309,11 @@
public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
throw new UnsupportedOperationException();
}
-
+
public Response processProducerAck(ProducerAck info) throws Exception {
return ack(command);
}
-
+
});
} catch (Exception e) {
if (responseRequired) {
@@ -325,11 +326,11 @@
}
}
-
+
public void onException(Exception error) {
- if( !connection.isStopping() ) {
+ if (!connection.isStopping()) {
error.printStackTrace();
- new Thread(){
+ new Thread() {
@Override
public void run() {
try {
@@ -366,7 +367,7 @@
return null;
}
}
-
+
class ProducerContext {
private IFlowController<MessageDelivery> controller;
@@ -378,7 +379,7 @@
// Openwire only uses credit windows at the producer level for
// producers that request the feature.
if (info.getWindowSize() > 0) {
- final Flow flow = new Flow("broker-"+name+"-inbound", false);
+ final Flow flow = new Flow("broker-" + name + "-inbound", false);
WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
@Override
protected void sendCredit(int credit) {
@@ -407,7 +408,8 @@
private final ConsumerInfo info;
private String name;
private BooleanExpression selector;
-
+ private boolean durable;
+
private SingleFlowRelay<MessageDelivery> queue;
public WindowLimiter<MessageDelivery> limiter;
@@ -416,8 +418,8 @@
this.name = info.getConsumerId().toString();
selector = parseSelector(info);
- Flow flow = new Flow("broker-"+name+"-outbound", false);
- limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize()/2) {
+ Flow flow = new Flow("broker-" + name + "-outbound", false);
+ limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
public int getElementSize(MessageDelivery m) {
return 1;
}
@@ -436,7 +438,7 @@
}
public void ack(MessageAck info) {
- synchronized(queue) {
+ synchronized (queue) {
limiter.onProtocolCredit(info.getMessageCount());
}
}
@@ -462,6 +464,14 @@
}
}
+ public boolean isDurable() {
+ return durable;
+ }
+
+ public AsciiBuffer getPersistentQueueName() {
+ return null;
+ }
+
}
protected void route(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
@@ -528,7 +538,7 @@
}
return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
}
-
+
private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
BooleanExpression rc = null;
if (info.getSelector() != null) {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Fri Mar 20 16:15:43 2009
@@ -19,6 +19,7 @@
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompFrame;
@@ -30,6 +31,7 @@
private String receiptId;
private int priority = Integer.MIN_VALUE;
private AsciiBuffer msgId;
+ private long tracking = -1;
public StompMessageDelivery(StompFrame frame, Destination destiantion) {
this.frame = frame;
@@ -99,4 +101,23 @@
return "true".equals(p);
}
+ public long getTrackingNumber() {
+ return tracking ;
+ }
+
+ public void setTrackingNumber(long tracking) {
+ this.tracking = tracking;
+ }
+
+ /**
+ * Returns the message's buffer representation.
+ * @return
+ */
+ public Buffer getMessageBuffer()
+ {
+ //Todo use asType() instead?
+ throw new UnsupportedOperationException("not yet implemented");
+ }
+
+
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Fri Mar 20 16:15:43 2009
@@ -263,6 +263,8 @@
private LinkedHashMap<AsciiBuffer, AsciiBuffer> sentMessageIds = new LinkedHashMap<AsciiBuffer, AsciiBuffer>();
+ private boolean durable;
+
public ConsumerContext(final StompFrame subscribe) throws Exception {
translator = translator(subscribe);
@@ -369,6 +371,14 @@
// return false;
// }
}
+
+ public boolean isDurable() {
+ return durable;
+ }
+
+ public AsciiBuffer getPersistentQueueName() {
+ return null;
+ }
}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=756565&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Fri Mar 20 16:15:43 2009
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Semaphore;
+
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.store.Store.Callback;
+import org.apache.activemq.broker.store.Store.RecordKey;
+import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.Session.DuplicateKeyException;
+import org.apache.activemq.broker.store.Store.Session.QueueNotFoundException;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.IPollableFlowSource;
+import org.apache.activemq.queue.IPollableFlowSource.FlowReadyListener;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+public class BrokerDatabase {
+
+ private Store store;
+ private final Flow databaseFlow = new Flow("database", false);
+
+ private final SizeLimiter<Operation> storeLimiter;
+ private Thread flushThread;
+ private final ExclusiveQueue<Operation> opQueue;
+ private AtomicBoolean running = new AtomicBoolean(false);
+ private final Semaphore opsReady = new Semaphore(0);
+ private final FlowReadyListener<Operation> enqueueListener;
+ private DatabaseListener listener;
+
+ public interface DatabaseListener {
+ /**
+ * Called if there is a catastrophic problem with the database.
+ *
+ * @param ioe
+ * The causing exception.
+ */
+ public void onDatabaseException(IOException ioe);
+ }
+
+ public BrokerDatabase(Store store) {
+ storeLimiter = new SizeLimiter<Operation>(1024 * 512, 0) {
+ public int getElementSize(Operation op) {
+ return op.getLimiterSize();
+ }
+ };
+ opQueue = new ExclusiveQueue<Operation>(databaseFlow, "DataBaseQueue", storeLimiter);
+ enqueueListener = new FlowReadyListener<Operation>() {
+
+ public void onFlowReady(IPollableFlowSource<Operation> source) {
+ opsReady.release();
+ }
+ };
+ }
+
+ public synchronized void start() {
+ if (flushThread == null) {
+
+ running.set(true);
+ opsReady.drainPermits();
+ flushThread = new Thread(new Runnable() {
+
+ public void run() {
+ processOps();
+ }
+
+ }, "StoreThread");
+ flushThread.start();
+ }
+ }
+
+ public synchronized void stop() {
+ if (flushThread != null) {
+
+ running.set(false);
+ boolean interrupted = false;
+ while (true) {
+ opsReady.release();
+ try {
+ flushThread.join();
+ break;
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ flushThread = null;
+ }
+ }
+
+ public void persistReceivedMessage(MessageDelivery delivery, Collection<DeliveryTarget> targets, ISourceController<?> source) {
+ add(new AddMessageOperation(delivery, targets), source, true);
+ }
+
+ /**
+ * Executes user supplied {@link Operation}. If the {@link Operation} does
+ * not throw any Exceptions, all updates to the store are committed,
+ * otherwise they are rolled back. Any exceptions thrown by the
+ * {@link Operation} are propagated by this method.
+ *
+ * If limiter space on the store processing queue is exceeded, the
+ * controller will be blocked.
+ *
+ * If this method is called with flush set to
+ * <code>false</false> there is no
+ * guarantee made about when the operation will be executed. If <code>flush</code>
+ * is <code>true</code> and {@link Operation#isDelayable()} is also
+ * <code>true</code> then an attempt will be made to execute the event at
+ * the {@link Store}'s configured delay interval.
+ *
+ * @param op
+ * The operation to execute
+ * @param flush
+ * Whether or not this operation needs immediate processing.
+ * @param controller
+ * the source of the operation.
+ */
+ private void add(Operation op, ISourceController<?> controller, boolean flush) {
+ opQueue.add(op, controller);
+ }
+
+ private final void processOps() {
+ while (running.get()) {
+ final Operation firstOp;
+ synchronized (opQueue) {
+ firstOp = opQueue.poll();
+ if (firstOp == null) {
+ opQueue.addFlowReadyListener(enqueueListener);
+ opsReady.acquireUninterruptibly();
+ continue;
+ }
+ }
+
+ // The first operation we get, triggers a store transaction.
+ if (firstOp != null) {
+ final ArrayList<Operation> processedQueue = new ArrayList<Operation>();
+ try {
+ store.execute(new Store.VoidCallback<Exception>(){
+ @Override
+ public void run(Session session) throws Exception {
+
+ // Try to execute the operation against the session...
+ try {
+ firstOp.execute(session);
+ processedQueue.add(firstOp);
+ } catch (CancellationException ignore) {
+ }
+
+ // See if we can batch up some additional operations in
+ // this transaction.
+
+ Operation op;
+ synchronized (opQueue) {
+ op = opQueue.poll();
+ if (op != null) {
+ try {
+ firstOp.execute(session);
+ processedQueue.add(op);
+ } catch (CancellationException ignore) {
+ }
+ }
+ }
+ }
+ });
+ // Wait for the operations to commit.
+ for (Operation processed : processedQueue) {
+ processed.onCommit();
+ }
+ } catch (IOException e) {
+ for (Operation processed : processedQueue) {
+ processed.onRollback(e);
+ }
+ onDatabaseException(e);
+ } catch (RuntimeException e) {
+ for (Operation processed : processedQueue) {
+ processed.onRollback(e);
+ }
+ } catch (Exception e) {
+ for (Operation processed : processedQueue) {
+ processed.onRollback(e);
+ }
+
+ }
+ }
+ }
+ }
+
+ private void onDatabaseException(IOException ioe) {
+ if (listener != null) {
+ listener.onDatabaseException(ioe);
+ }
+ }
+
+ /**
+ * This interface is used to execute transacted code.
+ *
+ * It is used by the {@link Store#execute(Callback)} method, often as
+ * anonymous class.
+ */
+ public interface Operation {
+
+ /**
+ * Gets called by the
+ * {@link Store#add(Operation, ISourceController, boolean)} method
+ * within a transactional context. If any exception is thrown including
+ * Runtime exception, the transaction is rolled back.
+ *
+ * @param session
+ * provides you access to read and update the persistent
+ * data.
+ * @return the result of the CallableCallback
+ * @throws CancellationException
+ * if the operation has been canceled. If this is thrown,
+ * the {@link #onCommit()} and {@link #onRollback()} methods
+ * will not be called.
+ * @throws Exception
+ * if an system error occured while executing the
+ * operations.
+ * @throws RuntimeException
+ * if an system error occured while executing the
+ * operations.
+ */
+ public void execute(Session session) throws CancellationException, Exception, RuntimeException;
+
+ /**
+ * Returns true if this operation can be delayed. This is useful in
+ * cases where external events can negate the need to execute the
+ * operation. The delay interval is not guaranteed to be honored, if
+ * subsequent events or other store flush policy/criteria requires a
+ * flush of subsequent events.
+ *
+ * @return True if the operation can be delayed.
+ */
+ public boolean isDelayable();
+
+ /**
+ * Attempts to cancel the store operation. Returns true if the operation
+ * could be canceled or false if the operation was already executed by
+ * the store.
+ *
+ * @return true if the operation could be canceled
+ */
+ public boolean cancel();
+
+ /**
+ * Returns the size to be used when calculating how much space this
+ * operation takes on the store processing queue.
+ *
+ * @return The limiter size to be used.
+ */
+ public int getLimiterSize();
+
+ /**
+ * Called after {@link #execute(Session)} is called and the the
+ * operation has been committed.
+ */
+ public void onCommit();
+
+ /**
+ * Called after {@link #execute(Session)} is called and the the
+ * operation has been rolled back.
+ */
+ public void onRollback(Throwable error);
+ }
+
+ /**
+ * This is a convenience base class that can be used to implement
+ * Operations. It handles operation cancellation for you.
+ */
+ public abstract class OperationBase implements Operation {
+ final private AtomicBoolean executePending = new AtomicBoolean(true);
+
+ public boolean cancel() {
+ return executePending.compareAndSet(true, false);
+ }
+
+ public void execute(Session session) throws CancellationException {
+ if (executePending.compareAndSet(true, false)) {
+ doExcecute(session);
+ } else {
+ throw new CancellationException();
+ }
+ }
+
+ abstract protected void doExcecute(Session session);
+
+ public int getLimiterSize() {
+ return 0;
+ }
+
+ public boolean isDelayable() {
+ return false;
+ }
+
+ public void onCommit() {
+ }
+
+ public void onRollback() {
+ }
+ }
+
+ private class AddMessageOperation extends OperationBase {
+ private final MessageDelivery delivery;
+ private final Collection<DeliveryTarget> targets;
+ private final Buffer messageBuffer;
+
+ public AddMessageOperation(MessageDelivery delivery, Collection<DeliveryTarget> targets) {
+ this.delivery = delivery;
+ this.messageBuffer = delivery.getMessageBuffer();
+ this.targets = targets;
+
+ }
+
+ public int getLimiterSize() {
+ return delivery.getFlowLimiterSize();
+ }
+
+ @Override
+ protected void doExcecute(Session session) {
+ // TODO need to get at protocol buffer.
+ RecordKey key = session.messageAdd(delivery.getMsgId(), messageBuffer);
+ for(DeliveryTarget target : targets)
+ {
+ try {
+ session.queueAddMessage(new AsciiBuffer(target.getPersistentQueueName()), key, null);
+ } catch (QueueNotFoundException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (DuplicateKeyException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void onRollback(Throwable error) {
+ // TODO Auto-generated method stub
+ }
+
+ public void onCommit() {
+ // Notify that we've saved the message.
+ delivery.getCompletionCallback().run();
+ }
+
+ }
+}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Fri Mar 20 16:15:43 2009
@@ -1,23 +1,65 @@
package org.apache.activemq.broker.store;
-import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.CancellationException;
-import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* Interface to persistently store and access data needed by the messaging
* system.
- *
*/
public interface Store {
+
+ /**
+ * This interface is used to execute transacted code.
+ *
+ * It is used by the {@link Store#execute(Callback)} method, often as
+ * anonymous class.
+ */
+ public interface Callback<R, T extends Exception> {
- public interface RecordKey {
+ /**
+ * Gets called by the {@link Store#execute(Callback)} method
+ * within a transactional context. If any exception is thrown including
+ * Runtime exception, the transaction is rolled back.
+ *
+ * @param session
+ * provides you access to read and update the persistent
+ * data.
+ * @return the result of the Callback
+ * @throws T
+ * if an system error occured while executing the
+ * operations.
+ */
+ public R execute(Session session) throws T;
+ }
+
+ /**
+ * Convenience class which allows you to implement {@link Callback} classes which do not return a value.
+ */
+ public abstract class VoidCallback <T extends Exception> implements Callback<Object, T> {
+
+ /**
+ * Gets called by the {@link Store#execute(VoidCallback)} method within a transactional context.
+ * If any exception is thrown including Runtime exception, the transaction is rolled back.
+ *
+ * @param session provides you access to read and update the persistent data.
+ * @throws T if an error occurs and the transaction should get rolled back
+ */
+ abstract public void run(Session session) throws T;
+
+ final public Object execute(Session session) throws T {
+ run(session);
+ return null;
+ }
+ }
+
+ public <R, T extends Exception> R execute(Callback<R,T> callback) throws T;
+
+ interface RecordKey {
+
}
/**
@@ -29,177 +71,56 @@
*/
public interface Session {
+ public class DuplicateKeyException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public DuplicateKeyException(String message) {
+ super(message);
+ }
+ }
+
+ public class QueueNotFoundException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public QueueNotFoundException(String message) {
+ super(message);
+ }
+ }
+
// Message related methods.
public RecordKey messageAdd(AsciiBuffer messageId, Buffer message);
-
public RecordKey messageGetKey(AsciiBuffer messageId);
-
public Buffer messageGet(RecordKey key);
// Message Chunking related methods.
public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer txid, Buffer message);
-
public void messageChunkAdd(RecordKey key, Buffer message);
-
public void messageChunkClose(RecordKey key);
-
public Buffer messageChunkGet(RecordKey key, int offset, int max);
// / Queue related methods.
- public Iterator<AsciiBuffer> queueList(AsciiBuffer first, int max);
-
+ public Iterator<AsciiBuffer> queueList(AsciiBuffer first);
public void queueAdd(AsciiBuffer queue);
-
public boolean queueRemove(AsciiBuffer queue);
-
- public void queueAddMessage(AsciiBuffer queue, RecordKey key);
-
- public void queueRemoveMessage(AsciiBuffer queue, RecordKey key);
-
- public Iterator<RecordKey> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max);
+ public void queueAddMessage(AsciiBuffer queue, RecordKey key, Buffer attachment) throws QueueNotFoundException, DuplicateKeyException;
+ public void queueRemoveMessage(AsciiBuffer queue, RecordKey key) throws QueueNotFoundException;
+ public Iterator<Buffer> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max);
// We could use this to associate additional data to a message on a
- // queue like
- // which consumer a message has been dispatched to.
- public void queueSetMessageAttachment(AsciiBuffer queue, RecordKey key, Buffer attachment);
+ // queue like which consumer a message has been dispatched to.
+ // public void queueSetMessageAttachment(AsciiBuffer queue, RecordKey
+ // key, Buffer attachment) throws QueueNotFoundException;
- public Buffer queueGetMessageAttachment(AsciiBuffer queue, RecordKey key);
+ // public Buffer queueGetMessageAttachment(AsciiBuffer queue, RecordKey
+ // key) throws QueueNotFoundException;
// / Simple Key Value related methods could come in handy to store misc
// data.
public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
-
public Buffer mapSet(AsciiBuffer map, Buffer key, Buffer value);
-
public Buffer mapGet(AsciiBuffer map, Buffer key);
-
public Buffer mapRemove(AsciiBuffer map, Buffer key);
-
public Iterator<Buffer> mapListKeys(AsciiBuffer map, Buffer first, int max);
}
-
- /**
- * This interface is used to execute transacted code.
- *
- * It is used by the {@link Store#execute(Callback)} method, often as
- * anonymous class.
- */
- public interface Operation {
-
- /**
- * Gets called by the {@link Store#add(Operation, ISourceController, boolean)} method within a
- * transactional context. If any exception is thrown including Runtime
- * exception, the transaction is rolled back.
- *
- * @param session
- * provides you access to read and update the persistent
- * data.
- * @return the result of the CallableCallback
- * @throws CancellationException
- * if the operation has been canceled. If this is thrown,
- * the {@link #onCommit()} and {@link #onRollback()} methods will
- * not be called.
- * @throws Exception
- * if an system error occured while executing the operations.
- * @throws RuntimeException
- * if an system error occured while executing the operations.
- */
- public void execute(Session session) throws CancellationException, Exception, RuntimeException;
-
- /**
- * Returns true if this operation can be delayed. This is useful in cases
- * where external events can negate the need to execute the operation. The delay
- * interval is not guaranteed to be honored, if subsequent events or other
- * store flush policy/criteria requires a flush of subsequent events.
- *
- * @return True if the operation can be delayed.
- */
- public boolean isDelayable();
-
- /**
- * Attempts to cancel the store operation. Returns true if the operation
- * could be canceled or false if the operation was already executed by the
- * store.
- *
- * @return true if the operation could be canceled
- */
- public boolean cancel();
-
- /**
- * Returns the size to be used when calculating how much space this operation
- * takes on the store processing queue.
- *
- * @return The limiter size to be used.
- */
- public long getLimiterSize();
-
- /**
- * Called after {@link #execute(Session)} is called and the the operation has been committed.
- */
- public void onCommit();
-
- /**
- * Called after {@link #execute(Session)} is called and the the operation has been rolled back.
- */
- public void onRollback(Throwable error);
- }
-
- /**
- * This is a convenience base class that can be used to implement Operations.
- * It handles operation cancellation for you.
- */
- public abstract class OperationBase implements Operation {
- final private AtomicBoolean executePending = new AtomicBoolean(true);
-
- public boolean cancel() {
- return executePending.compareAndSet(true, false);
- }
-
- public void execute(Session session) throws CancellationException {
- if( executePending.compareAndSet(true, false) ) {
- doExcecute(session);
- } else {
- throw new CancellationException();
- }
- }
-
- abstract protected void doExcecute(Session session);
-
- public long getLimiterSize() {
- return 0;
- }
-
- public boolean isDelayable() {
- return false;
- }
-
- public void onCommit() {
- }
-
- public void onRollback() {
- }
- }
-
- /**
- * Executes user supplied {@link Operation}. If the {@link Operation} does not
- * throw any Exceptions, all updates to the store are committed, otherwise
- * they are rolled back. Any exceptions thrown by the {@link Operation} are
- * propagated by this method.
- *
- * If limiter space on the store processing queue is exceeded, the controller will be
- * blocked.
- *
- * If this method is called with flush set to <code>false</false> there is no
- * guarantee made about when the operation will be executed. If <code>flush</code> is
- * <code>true</code> and {@link Operation#isDelayable()} is also <code>true</code>
- * then an attempt will be made to execute the event at the {@link Store}'s configured
- * delay interval.
- *
- * @param op The operation to execute
- * @param flush Whether or not this operation needs immediate processing.
- * @param controller the source of the operation.
- */
- public void add(Operation op, ISourceController<?> controller, boolean flush);
-
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Fri Mar 20 16:15:43 2009
@@ -197,13 +197,13 @@
}
}
- public void beginTransaction(ConnectionContext context) throws IOException {
+ public void beginTransaction() throws IOException {
throw new IOException("Not yet implemented.");
}
- public void commitTransaction(ConnectionContext context) throws IOException {
+ public void commitTransaction() throws IOException {
throw new IOException("Not yet implemented.");
}
- public void rollbackTransaction(ConnectionContext context) throws IOException {
+ public void rollbackTransaction() throws IOException {
throw new IOException("Not yet implemented.");
}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=756565&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Fri Mar 20 16:15:43 2009
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.memory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TreeMap;
+
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+
+
+public class MemoryStore implements Store {
+
+ MemorySession session = new MemorySession();
+
+ private class MemorySession implements Session {
+ private HashMap<RecordKey, Buffer> messages = new HashMap<RecordKey, Buffer>();
+ private HashMap<AsciiBuffer, TreeMap<RecordKey, Buffer>> queues = new HashMap<AsciiBuffer, TreeMap<RecordKey, Buffer>>();
+
+ // private HashMap<String, LinkedList<RecordKey>> queues = new
+ // HashMap<String, LinkedList<RecordKey>>();
+
+ public void beginTx() {
+
+ }
+
+ public void commitTx() {
+
+ }
+
+ public void rollback() {
+ throw new UnsupportedOperationException();
+ }
+
+ // //////////////////////////////////////////////////////////////////////////////
+ // Message related methods.
+ // ///////////////////////////////////////////////////////////////////////////////
+ public RecordKey messageAdd(AsciiBuffer messageId, Buffer message) {
+ RecordKey key = new MemoryRecordKey(messageId);
+ messages.put(key, message);
+ return key;
+ }
+
+ public Buffer messageGet(RecordKey key) {
+ return messages.get(key);
+ }
+
+ public RecordKey messageGetKey(AsciiBuffer messageId) {
+ MemoryRecordKey key = new MemoryRecordKey(messageId);
+ return messages.containsKey(key) ? key : null;
+ }
+
+ // //////////////////////////////////////////////////////////////////////////////
+ // Queue related methods.
+ // ///////////////////////////////////////////////////////////////////////////////
+ public void queueAdd(AsciiBuffer queue) {
+ TreeMap<RecordKey, Buffer> messages = queues.get(queue);
+ if (messages == null) {
+ messages = new TreeMap<RecordKey, Buffer>();
+ queues.put(queue, messages);
+ }
+ }
+
+ public void queueAddMessage(AsciiBuffer queue, RecordKey key, Buffer attachment) throws QueueNotFoundException, DuplicateKeyException {
+ TreeMap<RecordKey, Buffer> messages = queues.get(queue);
+ if (messages != null) {
+ if (messages.put(key, attachment) != null) {
+ throw new DuplicateKeyException("");
+ }
+ } else {
+ throw new QueueNotFoundException(queue.toString());
+ }
+ }
+
+ public void queueRemoveMessage(AsciiBuffer queue, RecordKey key) throws QueueNotFoundException {
+ TreeMap<RecordKey, Buffer> messages = queues.get(queue);
+ if (messages != null) {
+ messages.remove(key);
+ } else {
+ throw new QueueNotFoundException(queue.toString());
+ }
+ }
+
+ public Iterator<AsciiBuffer> queueList(AsciiBuffer first) {
+ ArrayList<AsciiBuffer> list = new ArrayList<AsciiBuffer>(queues.size());
+ for (AsciiBuffer queue : queues.keySet()) {
+ list.add(queue);
+ }
+ return list.iterator();
+ }
+
+ public Iterator<Buffer> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max) {
+ ArrayList<Buffer> list = new ArrayList<Buffer>(max);
+ TreeMap<RecordKey, Buffer> messages = queues.get(queue.toString());
+ if (messages != null) {
+ for (RecordKey key : messages.tailMap(firstRecord).keySet() ) {
+ list.add(messages.get(key));
+ if (list.size() == max) {
+ break;
+ }
+ }
+ }
+ return list.iterator();
+ }
+
+ public boolean queueRemove(AsciiBuffer queue) {
+ TreeMap<RecordKey, Buffer> messages = queues.get(queue.toString());
+ if (messages != null) {
+ Iterator<RecordKey> msgKeys = messages.keySet().iterator();
+ while (msgKeys.hasNext()) {
+ RecordKey msgKey = msgKeys.next();
+ try {
+ queueRemoveMessage(queue, msgKey);
+ } catch (QueueNotFoundException e) {
+ // Can't happen.
+ }
+ }
+ queues.remove(queue.toString());
+
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * public void queueUpdateMessageAttachment(AsciiBuffer queue, RecordKey
+ * key, Buffer attachment) { // TODO Auto-generated method stub }
+ *
+ * public Buffer queueGetMessageAttachment(AsciiBuffer queue, RecordKey
+ * key) throws QueueNotFoundException { TreeMap<RecordKey, Buffer>
+ * messages = queues.get(queue); if (messages != null) {
+ * messages.add(key); } else { throw new
+ * QueueNotFoundException(queue.toString()); } }
+ */
+
+ // //////////////////////////////////////////////////////////////////////////////
+ // Simple Key Value related methods could come in handy to store misc
+ // data.
+ // ///////////////////////////////////////////////////////////////////////////////
+ public Buffer mapGet(AsciiBuffer map, Buffer key) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Iterator<Buffer> mapListKeys(AsciiBuffer map, Buffer first, int max) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Buffer mapRemove(AsciiBuffer map, Buffer key) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Buffer mapSet(AsciiBuffer map, Buffer key, Buffer value) {
+ throw new UnsupportedOperationException();
+ }
+
+ // ///////////////////////////////////////////////////////////////////////////////
+ // Message Chunking related methods
+ // ///////////////////////////////////////////////////////////////////////////////
+ public void messageChunkAdd(RecordKey key, Buffer message) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void messageChunkClose(RecordKey key) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Buffer messageChunkGet(RecordKey key, int offset, int max) {
+ throw new UnsupportedOperationException();
+ }
+
+ public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer txid, Buffer message) {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ final private class MemoryRecordKey implements RecordKey {
+ final AsciiBuffer messageId;
+
+ MemoryRecordKey(AsciiBuffer messageId) {
+ this.messageId = messageId;
+ }
+
+ @Override
+ public int hashCode() {
+ return messageId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if( obj == null || obj.getClass()!=MemoryRecordKey.class )
+ return false;
+ if( this == obj )
+ return true;
+ MemoryRecordKey key = (MemoryRecordKey)obj;
+ return messageId.equals(key.messageId);
+ }
+ }
+
+ public <R, T extends Exception> R execute(Callback<R, T> callback) throws T {
+ return callback.execute(session);
+ }
+}
Modified: activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto (original)
+++ activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto Fri Mar 20 16:15:43 2009
@@ -106,3 +106,21 @@
required int32 log_id = 1;
required int32 offset = 2;
}
+
+message KahaQueueDef {
+ enum DestinationType {
+ EXCLUSIVE = 0
+ SHARED = 1
+ }
+
+ required string name = 1;
+ required int64 id = 2
+ optional int64 size = 3;
+ optional int64 save_extent = 4;
+ optional int64 resume_threshold = 5;
+}
+
+message KahaUndelMessageRecord {
+ required int64 message_id = 1;
+ required int64 queue_id = 2;
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java Fri Mar 20 16:15:43 2009
@@ -164,7 +164,7 @@
outboundLimiter = createProtocolLimiter(true, outboundFlow, outputWindowSize, outputResumeThreshold);
if (transport.narrow(DispatchableTransport.class) == null) {
- blockingTransport = false;
+ blockingTransport = true;
blockingWriter = Executors.newSingleThreadExecutor();
}
@@ -408,7 +408,8 @@
this.flow = flow;
}
- protected void remove(int size) {
+ @Override
+ public void remove(long size) {
super.remove(size);
if (!clientMode) {
available += size;