You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/10/21 09:32:09 UTC
svn commit: r1765973 [3/7] - in /qpid/java/branches/transfer-queue:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/
bdbstore/src/test/java/org/apache/qpid/server/stor...
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Oct 21 09:32:07 2016
@@ -30,15 +30,16 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.AcquiringMessageInstanceConsumer;
+import org.apache.qpid.server.message.BaseMessageInstance;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.LocalTransaction;
@@ -55,7 +56,7 @@ public abstract class QueueEntryImpl imp
private final MessageReference _message;
- private Set<Long> _rejectedBy = null;
+ private Set<Object> _rejectedBy = null;
private static final EntryState HELD_STATE = new EntryState()
{
@@ -177,7 +178,7 @@ public abstract class QueueEntryImpl imp
return _entryId;
}
- public Queue<?> getQueue()
+ public RecoverableBaseQueue getQueue()
{
return _queueEntryList.getQueue();
}
@@ -263,7 +264,7 @@ public abstract class QueueEntryImpl imp
boolean acquired = acquire();
if(!acquired)
{
- QueueConsumer consumer = getDeliveredConsumer();
+ AcquiringMessageInstanceConsumer<?,?> consumer = getDeliveredConsumer();
acquired = removeAcquisitionFromConsumer(consumer);
if(acquired)
{
@@ -304,19 +305,19 @@ public abstract class QueueEntryImpl imp
return acquired;
}
- public boolean acquire(ConsumerImpl sub)
+ public boolean acquire(MessageInstanceConsumer sub)
{
- final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getUnstealableState());
+ final boolean acquired = acquire(((AcquiringMessageInstanceConsumer<?,?>) sub).getOwningState().getUnstealableState());
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
- getQueue().incrementUnackedMsgCount(this);
+ _queueEntryList.onAcquiredByConsumer(this, sub);
}
return acquired;
}
@Override
- public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
{
EntryState state = _state;
if(state instanceof StealableConsumerAcquiredState
@@ -357,9 +358,9 @@ public abstract class QueueEntryImpl imp
}
@Override
- public ConsumerImpl getAcquiringConsumer()
+ public MessageInstanceConsumer getAcquiringConsumer()
{
- ConsumerImpl consumer;
+ AcquiringMessageInstanceConsumer<?,?> consumer;
EntryState state = _state;
if (state instanceof ConsumerAcquiredState)
{
@@ -373,14 +374,14 @@ public abstract class QueueEntryImpl imp
}
@Override
- public boolean isAcquiredBy(ConsumerImpl consumer)
+ public boolean isAcquiredBy(MessageInstanceConsumer consumer)
{
EntryState state = _state;
return (state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState)state).getConsumer() == consumer);
}
@Override
- public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
+ public boolean removeAcquisitionFromConsumer(MessageInstanceConsumer consumer)
{
EntryState state = _state;
if(state instanceof StealableConsumerAcquiredState
@@ -411,7 +412,7 @@ public abstract class QueueEntryImpl imp
}
@Override
- public void release(ConsumerImpl consumer)
+ public void release(MessageInstanceConsumer consumer)
{
EntryState state = _state;
if(isAcquiredBy(consumer) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
@@ -424,12 +425,12 @@ public abstract class QueueEntryImpl imp
{
if (previousState instanceof ConsumerAcquiredState)
{
- getQueue().decrementUnackedMsgCount(this);
+ _queueEntryList.onNoLongerAcquiredByConsumer(this);
}
if(!getQueue().isDeleted())
{
- getQueue().requeue(this);
+ _queueEntryList.requeue(this);
if (_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
{
notifyStateChange(previousState, AVAILABLE_STATE);
@@ -448,7 +449,7 @@ public abstract class QueueEntryImpl imp
EntryState state;
while((state = _state).getState() == State.AVAILABLE)
{
- boolean isHeld = getQueue().isHeld(this, evaluationTime);
+ boolean isHeld = _queueEntryList.isHeld(this, evaluationTime);
if(state == AVAILABLE_STATE && isHeld)
{
if(!_stateUpdater.compareAndSet(this, state, HELD_STATE))
@@ -475,20 +476,20 @@ public abstract class QueueEntryImpl imp
}
@Override
- public QueueConsumer getDeliveredConsumer()
+ public AcquiringMessageInstanceConsumer getDeliveredConsumer()
{
- return (QueueConsumer) getAcquiringConsumer();
+ return (AcquiringMessageInstanceConsumer) getAcquiringConsumer();
}
public void reject()
{
- QueueConsumer consumer = getDeliveredConsumer();
+ AcquiringMessageInstanceConsumer<?,?> consumer = getDeliveredConsumer();
if (consumer != null)
{
if (_rejectedBy == null)
{
- _rejectedBy = new HashSet<Long>();
+ _rejectedBy = new HashSet<>();
}
_rejectedBy.add(consumer.getConsumerNumber());
@@ -499,12 +500,12 @@ public abstract class QueueEntryImpl imp
}
}
- public boolean isRejectedBy(ConsumerImpl consumer)
+ public boolean isRejectedBy(MessageInstanceConsumer consumer)
{
if (_rejectedBy != null) // We have consumers that rejected this message
{
- return _rejectedBy.contains(consumer.getConsumerNumber());
+ return _rejectedBy.contains(consumer.getIdentifier());
}
else // This message hasn't been rejected yet.
{
@@ -525,10 +526,10 @@ public abstract class QueueEntryImpl imp
{
if (state instanceof ConsumerAcquiredState)
{
- getQueue().decrementUnackedMsgCount(this);
+ _queueEntryList.onNoLongerAcquiredByConsumer(this);
}
- getQueue().dequeue(this);
+ _queueEntryList.dequeue(this);
if(_stateChangeListeners != null)
{
notifyStateChange(state, DEQUEUED_STATE);
@@ -582,55 +583,10 @@ public abstract class QueueEntryImpl imp
}
}
- public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
+ public int routeToAlternate(final Action<? super BaseMessageInstance> action, ServerTransaction txn)
{
- if (!isAcquired())
- {
- throw new IllegalStateException("Illegal queue entry state. " + this + " is not acquired.");
- }
-
- final Queue<?> currentQueue = getQueue();
- Exchange<?> alternateExchange = currentQueue.getAlternateExchange();
- boolean autocommit = txn == null;
- int enqueues;
-
- if(autocommit)
- {
- txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
- }
-
- if (alternateExchange != null)
- {
- enqueues = alternateExchange.send(getMessage(),
- getMessage().getInitialRoutingAddress(),
- getInstanceProperties(),
- txn,
- action);
- }
- else
- {
- enqueues = 0;
- }
+ return _queueEntryList.routeToAlternate(this, action, txn);
- txn.dequeue(getEnqueueRecord(), new ServerTransaction.Action()
- {
- public void postCommit()
- {
- delete();
- }
-
- public void onRollback()
- {
-
- }
- });
-
- if(autocommit)
- {
- txn.commit();
- }
-
- return enqueues;
}
public boolean isQueueDeleted()
@@ -692,7 +648,7 @@ public abstract class QueueEntryImpl imp
@Override
public int getMaximumDeliveryCount()
{
- return getQueue().getMaximumDeliveryAttempts();
+ return _queueEntryList.getMaximumDeliveryAttempts();
}
public void incrementDeliveryCount()
@@ -723,7 +679,7 @@ public abstract class QueueEntryImpl imp
@Override
public boolean resend()
{
- QueueConsumer sub = getDeliveredConsumer();
+ AcquiringMessageInstanceConsumer<?,?> sub = getDeliveredConsumer();
if(sub != null)
{
return sub.resend(this);
@@ -734,7 +690,7 @@ public abstract class QueueEntryImpl imp
@Override
public TransactionLogResource getOwningResource()
{
- return getQueue();
+ return _queueEntryList.getQueue();
}
public void setRedelivered()
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Fri Oct 21 09:32:07 2016
@@ -20,13 +20,16 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
public interface QueueEntryList
{
- Queue<?> getQueue();
+ RecoverableBaseQueue getQueue();
QueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord);
@@ -44,4 +47,17 @@ public interface QueueEntryList
int getPriorities();
+ void onAcquiredByConsumer(QueueEntry queueEntry, final MessageInstanceConsumer consumer);
+
+ void onNoLongerAcquiredByConsumer(QueueEntry queueEntry);
+
+ void requeue(QueueEntry queueEntry);
+
+ boolean isHeld(QueueEntry queueEntry, long evaluationTime);
+
+ void dequeue(QueueEntry queueEntry);
+
+ int routeToAlternate(QueueEntry queueEntry, Action<? super BaseMessageInstance> action, ServerTransaction txn);
+
+ int getMaximumDeliveryAttempts();
}
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/RecoverableBaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/RecoverableBaseQueue.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/RecoverableBaseQueue.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/RecoverableBaseQueue.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+
+public interface RecoverableBaseQueue extends BaseQueue
+{
+ void recover(ServerMessage<?> message, MessageEnqueueRecord record);
+
+ void completeRecovery();
+
+ VirtualHost<?> getVirtualHost();
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/RecoverableBaseQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Fri Oct 21 09:32:07 2016
@@ -20,9 +20,13 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.SortedQueueEntry.Colour;
import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
/**
* A sorted implementation of QueueEntryList.
@@ -52,6 +56,50 @@ public class SortedQueueEntryList implem
return _queue;
}
+
+ @Override
+ public void onAcquiredByConsumer(final QueueEntry queueEntry, final MessageInstanceConsumer consumer)
+ {
+ getQueue().incrementUnackedMsgCount(queueEntry);
+ }
+
+ @Override
+ public void onNoLongerAcquiredByConsumer(final QueueEntry queueEntry)
+ {
+ getQueue().decrementUnackedMsgCount(queueEntry);
+ }
+
+ @Override
+ public void requeue(final QueueEntry queueEntry)
+ {
+ getQueue().requeue(queueEntry);
+ }
+
+ @Override
+ public boolean isHeld(final QueueEntry queueEntry, final long evaluationTime)
+ {
+ return getQueue().isHeld(queueEntry, evaluationTime);
+ }
+
+ @Override
+ public void dequeue(final QueueEntry queueEntry)
+ {
+ getQueue().dequeue(queueEntry);
+ }
+
+ @Override
+ public int routeToAlternate(final QueueEntry queueEntry, final Action<? super BaseMessageInstance> action, ServerTransaction txn)
+ {
+ return getQueue().routeToAlternate(queueEntry, action, txn);
+ }
+
+ @Override
+ public int getMaximumDeliveryAttempts()
+ {
+ return getQueue().getMaximumDeliveryAttempts();
+ }
+
+
public SortedQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
synchronized(_lock)
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java Fri Oct 21 09:32:07 2016
@@ -37,9 +37,9 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.KeyStoreMessages;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.IntegrityViolationException;
import org.apache.qpid.server.model.KeyStore;
@@ -115,7 +115,7 @@ public abstract class AbstractKeyStore<X
else
{
final int frequency = checkFrequency;
- getBroker().addChangeListener(new ConfigurationChangeListener()
+ getBroker().addChangeListener(new AbstractConfigurationChangeListener()
{
@Override
public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
@@ -135,38 +135,6 @@ public abstract class AbstractKeyStore<X
}
}
- @Override
- public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
- {
-
- }
-
- @Override
- public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
- {
-
- }
-
- @Override
- public void attributeSet(final ConfiguredObject<?> object,
- final String attributeName,
- final Object oldAttributeValue,
- final Object newAttributeValue)
- {
-
- }
-
- @Override
- public void bulkChangeStart(final ConfiguredObject<?> object)
- {
-
- }
-
- @Override
- public void bulkChangeEnd(final ConfiguredObject<?> object)
- {
-
- }
});
}
}
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java Fri Oct 21 09:32:07 2016
@@ -22,7 +22,7 @@ import java.util.Map;
import javax.security.auth.Subject;
-import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.PermissionedObject;
import org.apache.qpid.server.security.access.Operation;
public interface AccessControl<T extends SecurityToken>
@@ -34,9 +34,9 @@ public interface AccessControl<T extends
T newToken(Subject subject);
- Result authorise(T token, Operation operation, ConfiguredObject<?> configuredObject);
+ Result authorise(T token, Operation operation, PermissionedObject configuredObject);
- Result authorise(T token, Operation operation, ConfiguredObject<?> configuredObject, Map<String,Object> arguments);
+ Result authorise(T token, Operation operation, PermissionedObject configuredObject, Map<String,Object> arguments);
final class FixedResultAccessControl implements AccessControl<SecurityToken>
{
@@ -68,7 +68,7 @@ public interface AccessControl<T extends
@Override
public Result authorise(final SecurityToken token,
final Operation operation,
- final ConfiguredObject<?> configuredObject)
+ final PermissionedObject configuredObject)
{
return _result;
}
@@ -76,7 +76,7 @@ public interface AccessControl<T extends
@Override
public Result authorise(final SecurityToken token,
final Operation operation,
- final ConfiguredObject<?> configuredObject,
+ final PermissionedObject configuredObject,
final Map<String, Object> arguments)
{
return _result;
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/CompoundAccessControl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/CompoundAccessControl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/CompoundAccessControl.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/CompoundAccessControl.java Fri Oct 21 09:32:07 2016
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
-import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.PermissionedObject;
import org.apache.qpid.server.security.access.Operation;
public class CompoundAccessControl implements AccessControl<CompoundSecurityToken>
@@ -79,7 +79,7 @@ public class CompoundAccessControl imple
@Override
public Result authorise(final CompoundSecurityToken token,
final Operation operation,
- final ConfiguredObject<?> configuredObject)
+ final PermissionedObject configuredObject)
{
return authorise(token, operation, configuredObject, Collections.<String,Object>emptyMap());
}
@@ -87,14 +87,14 @@ public class CompoundAccessControl imple
@Override
public Result authorise(final CompoundSecurityToken token,
final Operation operation,
- final ConfiguredObject<?> configuredObject,
+ final PermissionedObject configuredObject,
final Map<String, Object> arguments)
{
List<AccessControl<?>> underlying = _underlyingControls.get();
Map<AccessControl<?>, SecurityToken> compoundToken = token == null ? null : token.getCompoundToken(underlying);
for(AccessControl control : underlying)
{
- SecurityToken underlyingToken = compoundToken == null ? null : compoundToken.get(underlying);
+ SecurityToken underlyingToken = compoundToken == null ? null : compoundToken.get(control);
final Result result = control.authorise(underlyingToken, operation, configuredObject, arguments);
if(result.isFinal())
{
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java Fri Oct 21 09:32:07 2016
@@ -35,21 +35,20 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ConsumerOption;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.message.internal.InternalMessageHeader;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.virtualhost.AbstractSystemMessageSource;
-public class TrustStoreMessageSource extends AbstractSystemMessageSource implements MessageSource
+public class TrustStoreMessageSource extends AbstractSystemMessageSource
{
private static final Logger LOGGER = LoggerFactory.getLogger(TrustStoreMessageSource.class);
@@ -63,7 +62,7 @@ public class TrustStoreMessageSource ext
super(getSourceNameFromTrustStore(trustStore), virtualHost);
_virtualHost = virtualHost;
_trustStore = trustStore;
- _trustStore.addChangeListener(new ConfigurationChangeListener()
+ _trustStore.addChangeListener(new AbstractConfigurationChangeListener()
{
@Override
public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
@@ -75,18 +74,6 @@ public class TrustStoreMessageSource ext
}
@Override
- public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
- {
-
- }
-
- @Override
- public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
- {
-
- }
-
- @Override
public void attributeSet(final ConfiguredObject<?> object,
final String attributeName,
final Object oldAttributeValue,
@@ -95,17 +82,7 @@ public class TrustStoreMessageSource ext
updateCertCache();
}
- @Override
- public void bulkChangeStart(final ConfiguredObject<?> object)
- {
-
- }
- @Override
- public void bulkChangeEnd(final ConfiguredObject<?> object)
- {
-
- }
});
if(_trustStore.getState() == State.ACTIVE)
{
@@ -114,15 +91,16 @@ public class TrustStoreMessageSource ext
}
@Override
- public Consumer addConsumer(final ConsumerTarget target,
- final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final String consumerName,
- final EnumSet<ConsumerImpl.Option> options, final Integer priority)
+ public SystemMessageSourceConsumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ final EnumSet<ConsumerOption> options, final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused
{
- final Consumer consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
+ final SystemMessageSourceConsumer
+ consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
consumer.send(createMessage());
target.queueEmpty();
return consumer;
@@ -141,7 +119,7 @@ public class TrustStoreMessageSource ext
{
InternalMessage message = createMessage();
- for(Consumer c : new ArrayList<>(getConsumers()))
+ for(SystemMessageSourceConsumer c : new ArrayList<>(getConsumers()))
{
c.send(message);
}
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java Fri Oct 21 09:32:07 2016
@@ -22,8 +22,8 @@ package org.apache.qpid.server.security;
import java.util.Collection;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.TrustStore;
@@ -58,12 +58,8 @@ public class TrustStoreMessageSourceCrea
updateTrustStoreSourceRegistration(registry, trustStore);
trustStore.addChangeListener(trustStoreChangeListener);
}
- broker.addChangeListener(new ConfigurationChangeListener()
+ broker.addChangeListener(new AbstractConfigurationChangeListener()
{
- @Override
- public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
- {
- }
@Override
public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
@@ -90,26 +86,6 @@ public class TrustStoreMessageSourceCrea
}
}
- @Override
- public void attributeSet(final ConfiguredObject<?> object,
- final String attributeName,
- final Object oldAttributeValue,
- final Object newAttributeValue)
- {
-
- }
-
- @Override
- public void bulkChangeStart(final ConfiguredObject<?> object)
- {
-
- }
-
- @Override
- public void bulkChangeEnd(final ConfiguredObject<?> object)
- {
-
- }
});
}
@@ -141,7 +117,7 @@ public class TrustStoreMessageSourceCrea
}
}
- private class TrustStoreChangeListener implements ConfigurationChangeListener
+ private class TrustStoreChangeListener extends AbstractConfigurationChangeListener
{
private final SystemNodeRegistry _registry;
@@ -159,19 +135,6 @@ public class TrustStoreMessageSourceCrea
updateTrustStoreSourceRegistration(_registry, (TrustStore<?>)object);
}
-
- @Override
- public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
- {
-
- }
-
- @Override
- public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
- {
-
- }
-
@Override
public void attributeSet(final ConfiguredObject<?> object,
final String attributeName,
@@ -181,16 +144,5 @@ public class TrustStoreMessageSourceCrea
updateTrustStoreSourceRegistration(_registry, (TrustStore<?>)object);
}
- @Override
- public void bulkChangeStart(final ConfiguredObject<?> object)
- {
-
- }
-
- @Override
- public void bulkChangeEnd(final ConfiguredObject<?> object)
- {
-
- }
}
}
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Fri Oct 21 09:32:07 2016
@@ -36,13 +36,12 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
@@ -627,13 +626,8 @@ public class VirtualHostStoreUpgraderAnd
}
});
}
- _virtualHostNode.addChangeListener(new ConfigurationChangeListener()
+ _virtualHostNode.addChangeListener(new AbstractConfigurationChangeListener()
{
- @Override
- public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
- {
-
- }
@Override
public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
@@ -665,26 +659,6 @@ public class VirtualHostStoreUpgraderAnd
}
}
- @Override
- public void attributeSet(final ConfiguredObject<?> object,
- final String attributeName,
- final Object oldAttributeValue,
- final Object newAttributeValue)
- {
-
- }
-
- @Override
- public void bulkChangeStart(final ConfiguredObject<?> object)
- {
-
- }
-
- @Override
- public void bulkChangeEnd(final ConfiguredObject<?> object)
- {
-
- }
});
if(isNew)
{
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/OutboundTransferDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/OutboundTransferDestination.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/OutboundTransferDestination.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/OutboundTransferDestination.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
+
+public class OutboundTransferDestination implements MessageDestination
+{
+ private final VirtualHost<?> _virtualHost;
+ private final String _address;
+
+ public OutboundTransferDestination(final VirtualHost<?> virtualHost, final String address)
+ {
+ _virtualHost = virtualHost;
+ _address = address;
+ }
+
+ @Override
+ public String getName()
+ {
+ return "$transfer";
+ }
+
+ @Override
+ public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
+ final String routingAddress,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super BaseMessageInstance> postEnqueueAction)
+ {
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ throw new VirtualHostUnavailableException(this._virtualHost);
+ }
+
+
+ final TransferQueue transferQueue = _virtualHost.getTransferQueue();
+ txn.enqueue(transferQueue, message, new ServerTransaction.EnqueueAction()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit(MessageEnqueueRecord... records)
+ {
+ try
+ {
+ for (final MessageEnqueueRecord record : records)
+ {
+ transferQueue.enqueue(message, postEnqueueAction, record);
+ }
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return 1;
+
+ }
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/OutboundTransferDestination.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/QueueContext.java (from r1755476, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/QueueContext.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/QueueContext.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java&r1=1755476&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/QueueContext.java Fri Oct 21 09:32:07 2016
@@ -19,10 +19,12 @@
*
*/
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.transfer;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.qpid.server.queue.QueueEntry;
+
final class QueueContext
{
private volatile QueueEntry _lastSeenEntry;
@@ -31,11 +33,11 @@ final class QueueContext
static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
_lastSeenUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
+ (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
_releasedUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntry.class, "_releasedEntry");
+ (QueueContext.class, QueueEntry.class, "_releasedEntry");
public QueueContext(QueueEntry head)
{
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueue.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueue.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueue.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.queue.RecoverableBaseQueue;
+
+public interface TransferQueue extends RecoverableBaseQueue, MessageDestination
+{
+ TransferQueueConsumer addConsumer(TransferTarget target,
+ String consumerName);
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueConsumer.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueConsumer.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueConsumer.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,213 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.regex.Pattern;
+
+import org.apache.qpid.server.message.AcquiringMessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.queue.QueueEntry;
+
+public class TransferQueueConsumer implements AcquiringMessageInstanceConsumer<TransferQueueConsumer, TransferTarget>
+{
+ private final TransferQueueImpl _transferQueue;
+ private final TransferTarget _target;
+ private final String _name;
+
+ private final ConcurrentLinkedQueue<TransferQueueEntry> _entries = new ConcurrentLinkedQueue<>();
+ private final Pattern _matchPattern;
+ private volatile QueueContext _queueContext;
+
+
+ private final MessageInstance.StealableConsumerAcquiredState<TransferQueueConsumer>
+ _owningState = new MessageInstance.StealableConsumerAcquiredState<>(this);
+ private final Object _identifier = new Object();
+
+
+ TransferQueueConsumer(final TransferQueueImpl transferQueue,
+ final TransferTarget target,
+ final String consumerName)
+ {
+ _transferQueue = transferQueue;
+ _target = target;
+ _name = consumerName;
+
+ Collection<String> globalAddressDomains = _target.getGlobalAddressDomains();
+ StringBuilder matchPattern = new StringBuilder();
+ boolean isFirst = true;
+ for(String domain : globalAddressDomains)
+ {
+ if(isFirst)
+ {
+ isFirst = false;
+ }
+ else
+ {
+ matchPattern.append('|');
+ }
+ matchPattern.append('(');
+ matchPattern.append(Pattern.quote(domain.endsWith("/") ? domain : (domain + "/")));
+ matchPattern.append(".*)");
+ }
+ _matchPattern = Pattern.compile(matchPattern.toString());
+ }
+
+ boolean hasInterest(final QueueEntry entry)
+ {
+ String initialRoutingAddress = entry.getMessage().getInitialRoutingAddress();
+ boolean matches = _matchPattern.matcher(initialRoutingAddress).matches();
+ return matches;
+ }
+
+ public boolean processPending()
+ {
+ if(!isSuspended())
+ {
+ TransferQueueEntry entry = _transferQueue.getNextAvailableEntry(this);
+ if(entry != null && !wouldSuspend(entry))
+ {
+ if (!entry.acquire(this))
+ {
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this consumer
+ restoreCredit(entry);
+ }
+ else
+ {
+ _transferQueue.setLastSeenEntry(this, entry);
+
+ send(entry);
+ return true;
+ }
+ }
+
+ }
+ return false;
+ }
+
+ void setQueueContext(final QueueContext queueContext)
+ {
+ _queueContext = queueContext;
+ }
+
+ QueueContext getQueueContext()
+ {
+ return _queueContext;
+ }
+
+ @Override
+ public MessageInstance.StealableConsumerAcquiredState<TransferQueueConsumer> getOwningState()
+ {
+ return _owningState;
+ }
+
+ public boolean isSuspended()
+ {
+ return _target.isSuspended();
+ }
+
+ boolean wouldSuspend(final TransferQueueEntry entry)
+ {
+ return _target.wouldSuspend(entry);
+ }
+
+ public TransferTarget getTarget()
+ {
+ return _target;
+ }
+
+ @Override
+ public void acquisitionRemoved(final QueueEntry queueEntry)
+ {
+
+ }
+
+ @Override
+ public long getConsumerNumber()
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean resend(final QueueEntry queueEntry)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean acquires()
+ {
+ return true;
+ }
+
+ @Override
+ public String getName()
+ {
+ return "$transfer";
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public void flush()
+ {
+
+ }
+
+ @Override
+ public void externalStateChange()
+ {
+
+ }
+
+ @Override
+ public Object getIdentifier()
+ {
+ return _identifier;
+ }
+
+ public void send(final TransferQueueEntry entry)
+ {
+ _target.send(entry);
+ }
+
+ public void restoreCredit(final TransferQueueEntry entry)
+ {
+ _target.restoreCredit(entry.getMessage());
+ }
+
+ public void notifyWork()
+ {
+ _target.notifyWork();
+ }
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntry.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntry.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntry.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.OrderedQueueEntry;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+
+public class TransferQueueEntry extends OrderedQueueEntry
+{
+
+ protected TransferQueueEntry(final TransferQueueEntryList queueEntryList)
+ {
+ super(queueEntryList);
+ }
+
+ public TransferQueueEntry(final TransferQueueEntryList queueEntryList,
+ final ServerMessage message,
+ final MessageEnqueueRecord messageEnqueueRecord)
+ {
+ super(queueEntryList, message, messageEnqueueRecord);
+ }
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntryList.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntryList.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntryList.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.OrderedBaseQueueEntryList;
+import org.apache.qpid.server.queue.OrderedQueueEntry;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryList;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+
+public class TransferQueueEntryList extends OrderedBaseQueueEntryList<TransferQueue>
+{
+
+ private static final HeadCreator HEAD_CREATOR = new HeadCreator()
+ {
+ @Override
+ public TransferQueueEntry createHead(final QueueEntryList list)
+ {
+ return new TransferQueueEntry((TransferQueueEntryList) list);
+ }
+ };
+
+ public TransferQueueEntryList(final TransferQueue queue)
+ {
+ super(queue, HEAD_CREATOR);
+ }
+
+
+ @Override
+ protected OrderedQueueEntry createQueueEntry(final ServerMessage<?> message,
+ final MessageEnqueueRecord enqueueRecord)
+ {
+ return new TransferQueueEntry(this, message, enqueueRecord);
+ }
+
+ @Override
+ public void onAcquiredByConsumer(final QueueEntry queueEntry, final MessageInstanceConsumer consumer)
+ {
+
+ }
+
+ @Override
+ public void onNoLongerAcquiredByConsumer(final QueueEntry queueEntry)
+ {
+
+ }
+
+ @Override
+ public void requeue(final QueueEntry queueEntry)
+ {
+
+ }
+
+ @Override
+ public boolean isHeld(final QueueEntry queueEntry, final long evaluationTime)
+ {
+ return false;
+ }
+
+ @Override
+ public void dequeue(final QueueEntry queueEntry)
+ {
+
+ }
+
+ @Override
+ public int routeToAlternate(final QueueEntry queueEntry,
+ final Action<? super BaseMessageInstance> action,
+ final ServerTransaction txn)
+ {
+ return 0;
+ }
+
+ @Override
+ public int getMaximumDeliveryAttempts()
+ {
+ return 0;
+ }
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntryList.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueImpl.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueImpl.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueImpl.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,413 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
+
+public class TransferQueueImpl implements TransferQueue
+{
+ private static final UUID TRANSFER_QUEUE_ID = UUID.nameUUIDFromBytes("$transfer".getBytes(StandardCharsets.UTF_8));
+
+ private static final int RECOVERING = 1;
+ private static final int COMPLETING_RECOVERY = 2;
+ private static final int RECOVERED = 3;
+
+ private final AtomicInteger _recovering = new AtomicInteger(RECOVERING);
+ private final AtomicInteger _enqueuingWhileRecovering = new AtomicInteger(0);
+
+ private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
+
+ private final VirtualHost<?> _virtualHost;
+
+ private final TransferQueueEntryList _queueEntryList;
+ private Collection<TransferQueueConsumer> _consumers = new CopyOnWriteArrayList<>();
+
+
+ public TransferQueueImpl(final VirtualHost<?> virtualHost)
+ {
+ _virtualHost = virtualHost;
+ _queueEntryList = new TransferQueueEntryList(this);
+ }
+
+ @Override
+ public void enqueue(final ServerMessage message,
+ final Action<? super BaseMessageInstance> action,
+ final MessageEnqueueRecord enqueueRecord)
+ {
+ if(_recovering.get() != RECOVERED)
+ {
+ _enqueuingWhileRecovering.incrementAndGet();
+
+ boolean addedToRecoveryQueue;
+ try
+ {
+ if(addedToRecoveryQueue = (_recovering.get() == RECOVERING))
+ {
+ _postRecoveryQueue.add(new EnqueueRequest(message, action, enqueueRecord));
+ }
+ }
+ finally
+ {
+ _enqueuingWhileRecovering.decrementAndGet();
+ }
+
+ if(!addedToRecoveryQueue)
+ {
+ while(_recovering.get() != RECOVERED)
+ {
+ Thread.yield();
+ }
+ doEnqueue(message, action, enqueueRecord);
+ }
+ }
+ else
+ {
+ doEnqueue(message, action, enqueueRecord);
+ }
+
+
+ }
+
+ @Override
+ public void recover(final ServerMessage<?> message, final MessageEnqueueRecord enqueueRecord)
+ {
+ doEnqueue(message, null, enqueueRecord);
+ }
+
+ @Override
+ public final void completeRecovery()
+ {
+ if(_recovering.compareAndSet(RECOVERING, COMPLETING_RECOVERY))
+ {
+ while(_enqueuingWhileRecovering.get() != 0)
+ {
+ Thread.yield();
+ }
+
+ // at this point we can assert that any new enqueue to the queue will not try to put into the post recovery
+ // queue (because the state is no longer RECOVERING, but also no threads are currently trying to enqueue
+ // because the _enqueuingWhileRecovering count is 0.
+
+ enqueueFromPostRecoveryQueue();
+
+ _recovering.set(RECOVERED);
+
+ }
+ }
+
+ @Override
+ public VirtualHost<?> getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ private void enqueueFromPostRecoveryQueue()
+ {
+ while(!_postRecoveryQueue.isEmpty())
+ {
+ EnqueueRequest request = _postRecoveryQueue.poll();
+ MessageReference<?> messageReference = request.getMessage();
+ doEnqueue(messageReference.getMessage(), request.getAction(), request.getEnqueueRecord());
+ messageReference.release();
+ }
+ }
+
+
+
+ protected void doEnqueue(final ServerMessage message, final Action<? super BaseMessageInstance> action, MessageEnqueueRecord enqueueRecord)
+ {
+ final TransferQueueEntry entry = (TransferQueueEntry) _queueEntryList.add(message, enqueueRecord);
+ for (TransferQueueConsumer consumer : getConsumers())
+ {
+ if (consumer.hasInterest(entry))
+ {
+ consumer.notifyWork();
+ }
+ }
+ }
+
+ private Collection<TransferQueueConsumer> getConsumers()
+ {
+ return _consumers;
+ }
+
+ @Override
+ public TransferQueueConsumer addConsumer(final TransferTarget target,
+ final String consumerName)
+ {
+
+
+
+ TransferQueueConsumer consumer = new TransferQueueConsumer(this,
+ target,
+ consumerName);
+
+ QueueContext queueContext = new QueueContext(_queueEntryList.getHead());
+ consumer.setQueueContext(queueContext);
+ _consumers.add(consumer);
+ consumer.notifyWork();
+
+
+ /* consumer.setStateListener(this);
+ QueueContext queueContext;
+ if(filters == null || !filters.startAtTail())
+ {
+ queueContext = new QueueContext(getEntries().getHead());
+ }
+ else
+ {
+ queueContext = new QueueContext(getEntries().getTail());
+ }
+ consumer.setQueueContext(queueContext);
+
+ if (!isDeleted())
+ {
+ _consumerList.add(consumer);
+
+ if (isDeleted())
+ {
+ consumer.queueDeleted();
+ }
+ }
+ else
+ {
+ // TODO
+ }
+
+ consumer.addChangeListener(_deletedChildListener);
+
+ deliverAsync();
+*/
+ return consumer;
+ }
+
+
+ void setLastSeenEntry(final TransferQueueConsumer sub, final TransferQueueEntry entry)
+ {
+ QueueContext subContext = sub.getQueueContext();
+ if (subContext != null)
+ {
+ QueueEntry releasedEntry = subContext.getReleasedEntry();
+
+ QueueContext._lastSeenUpdater.set(subContext, entry);
+ if(releasedEntry == entry)
+ {
+ QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+ }
+ }
+ }
+
+ TransferQueueEntry getNextAvailableEntry(final TransferQueueConsumer sub)
+ {
+ QueueContext context = sub.getQueueContext();
+ if(context != null)
+ {
+ QueueEntry lastSeen = context.getLastSeenEntry();
+ QueueEntry releasedNode = context.getReleasedEntry();
+
+ TransferQueueEntry node =
+ (TransferQueueEntry) ((releasedNode != null && lastSeen.compareTo(releasedNode) >= 0)
+ ? releasedNode : _queueEntryList.next(lastSeen));
+
+ boolean expired = false;
+ while (node != null
+ && (!node.isAvailable()
+ || (expired = node.expired())
+ || !sub.hasInterest(node)))
+ {
+ if (expired)
+ {
+ expired = false;
+ if (node.acquire())
+ {
+ dequeueEntry(node);
+ }
+ }
+
+ if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
+ {
+ QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
+ }
+
+ lastSeen = context.getLastSeenEntry();
+ releasedNode = context.getReleasedEntry();
+ node = (TransferQueueEntry) ((releasedNode != null && lastSeen.compareTo(releasedNode) >= 0)
+ ? releasedNode
+ : _queueEntryList.next(lastSeen));
+ }
+ return node;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private void dequeueEntry(final QueueEntry node)
+ {
+ ServerTransaction txn = new AutoCommitTransaction(_virtualHost.getMessageStore());
+ dequeueEntry(node, txn);
+ }
+
+ private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
+ {
+ txn.dequeue(node.getEnqueueRecord(),
+ new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ node.delete();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isDeleted()
+ {
+ return false;
+ }
+
+ @Override
+ public String getName()
+ {
+ return "$transfer";
+ }
+
+ @Override
+ public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
+ final String routingAddress,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super BaseMessageInstance> postEnqueueAction)
+ {
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ throw new VirtualHostUnavailableException(this._virtualHost);
+ }
+
+ if(!message.isReferenced(this))
+ {
+ txn.enqueue(this, message, new ServerTransaction.EnqueueAction()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit(MessageEnqueueRecord... records)
+ {
+ try
+ {
+ TransferQueueImpl.this.enqueue(message, postEnqueueAction, records[0]);
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
+
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return TRANSFER_QUEUE_ID;
+ }
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.DEFAULT;
+ }
+
+ private static class EnqueueRequest
+ {
+ private final MessageReference<?> _message;
+ private final Action<? super BaseMessageInstance> _action;
+ private final MessageEnqueueRecord _enqueueRecord;
+
+ public EnqueueRequest(final ServerMessage message,
+ final Action<? super BaseMessageInstance> action,
+ final MessageEnqueueRecord enqueueRecord)
+ {
+ _enqueueRecord = enqueueRecord;
+ _message = message.newReference();
+ _action = action;
+ }
+
+ public MessageReference<?> getMessage()
+ {
+ return _message;
+ }
+
+ public Action<? super BaseMessageInstance> getAction()
+ {
+ return _action;
+ }
+
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return _enqueueRecord;
+ }
+ }
+
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferTarget.java (from r1755476, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferTarget.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferTarget.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java&r1=1755476&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferTarget.java Fri Oct 21 09:32:07 2016
@@ -18,74 +18,24 @@
* under the License.
*
*/
-package org.apache.qpid.server.consumer;
+package org.apache.qpid.server.transfer;
+
+import java.util.Collection;
-import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.StateChangeListener;
-public interface ConsumerTarget
+public interface TransferTarget
{
+ void notifyWork();
- void acquisitionRemoved(MessageInstance node);
-
- void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
-
- boolean processPending();
-
- boolean hasPendingWork();
-
- String getTargetAddress();
-
- boolean hasCredit();
-
- enum State
- {
- ACTIVE, SUSPENDED, CLOSED
- }
-
- State getState();
-
- void consumerAdded(ConsumerImpl sub);
-
- void consumerRemoved(ConsumerImpl sub);
-
- void notifyCurrentState();
-
- void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
-
- long getUnacknowledgedBytes();
+ Collection<String> getGlobalAddressDomains();
- long getUnacknowledgedMessages();
+ void send(TransferQueueEntry entry);
- AMQSessionModel getSessionModel();
+ void restoreCredit(ServerMessage message);
- long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
-
- boolean hasMessagesToSend();
-
- void sendNextMessage();
-
- void flushBatched();
-
- void queueDeleted();
-
- void queueEmpty();
-
- boolean allocateCredit(ServerMessage msg);
-
- void restoreCredit(ServerMessage queueEntry);
+ boolean wouldSuspend(TransferQueueEntry entry);
boolean isSuspended();
-
- boolean close();
-
- boolean trySendLock();
-
- void getSendLock();
-
- void releaseSendLock();
-
}
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Fri Oct 21 09:32:07 2016
@@ -331,18 +331,18 @@ public abstract class AbstractAMQPConnec
@Override
public void pushScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
{
- if(_network instanceof NonBlockingConnection)
+ if(_network instanceof NonBlockingInboundConnection)
{
- ((NonBlockingConnection) _network).pushScheduler(networkConnectionScheduler);
+ ((NonBlockingInboundConnection) _network).pushScheduler(networkConnectionScheduler);
}
}
@Override
public NetworkConnectionScheduler popScheduler()
{
- if(_network instanceof NonBlockingConnection)
+ if(_network instanceof NonBlockingInboundConnection)
{
- return ((NonBlockingConnection) _network).popScheduler();
+ return ((NonBlockingInboundConnection) _network).popScheduler();
}
return null;
}
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Fri Oct 21 09:32:07 2016
@@ -113,7 +113,7 @@ public class NetworkConnectionScheduler
}
}
- void processConnection(final NonBlockingConnection connection)
+ void processConnection(final SchedulableConnection connection)
{
Thread.currentThread().setName(connection.getThreadName());
connection.doPreWork();
@@ -200,12 +200,12 @@ public class NetworkConnectionScheduler
_selectorThread.cancelAcceptingSocket(serverSocket);
}
- public void addConnection(final NonBlockingConnection connection)
+ public void addConnection(final SchedulableConnection connection)
{
_selectorThread.addConnection(connection);
}
- public void removeConnection(final NonBlockingConnection connection)
+ public void removeConnection(final SchedulableConnection connection)
{
_selectorThread.removeConnection(connection);
}
@@ -215,7 +215,7 @@ public class NetworkConnectionScheduler
return _poolSize;
}
- public void schedule(final NonBlockingConnection connection)
+ public void schedule(final SchedulableConnection connection)
{
_selectorThread.addToWork(connection);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org