You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/27 13:54:32 UTC
[05/15] activemq-artemis git commit: ARTEMIS-751 Simplification of
the AMQP implementation
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
deleted file mode 100644
index 576e61a..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.logger;
-
-import org.proton.plug.exceptions.ActiveMQAMQPIllegalStateException;
-import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
-import org.proton.plug.exceptions.ActiveMQAMQPInvalidFieldException;
-import org.jboss.logging.annotations.Message;
-import org.jboss.logging.annotations.MessageBundle;
-import org.jboss.logging.Messages;
-import org.proton.plug.exceptions.ActiveMQAMQPNotFoundException;
-
-/**
- * Logger Code 11
- * <p>
- * Each message id must be 6 digits long starting with 10, the 3rd digit should be 9. So the range
- * is from 219000 to 119999.
- * <p>
- * Once released, methods should not be deleted as they may be referenced by knowledge base
- * articles. Unused methods should be marked as deprecated.
- */
-@MessageBundle(projectCode = "AMQ")
-public interface ActiveMQAMQPProtocolMessageBundle {
-
- ActiveMQAMQPProtocolMessageBundle BUNDLE = Messages.getBundle(ActiveMQAMQPProtocolMessageBundle.class);
-
- @Message(id = 219000, value = "target address not set")
- ActiveMQAMQPInvalidFieldException targetAddressNotSet();
-
- @Message(id = 219001, value = "error creating temporary queue, {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQAMQPInternalErrorException errorCreatingTemporaryQueue(String message);
-
- @Message(id = 219002, value = "target address does not exist")
- ActiveMQAMQPNotFoundException addressDoesntExist();
-
- @Message(id = 219003, value = "error finding temporary queue, {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQAMQPNotFoundException errorFindingTemporaryQueue(String message);
-
- @Message(id = 219005, value = "error creating consumer, {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQAMQPInternalErrorException errorCreatingConsumer(String message);
-
- @Message(id = 219006, value = "error starting consumer, {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQAMQPIllegalStateException errorStartingConsumer(String message);
-
- @Message(id = 219007, value = "error acknowledging message {0}, {1}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQAMQPIllegalStateException errorAcknowledgingMessage(String messageID, String message);
-
- @Message(id = 219008, value = "error cancelling message {0}, {1}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQAMQPIllegalStateException errorCancellingMessage(String messageID, String message);
-
- @Message(id = 219010, value = "source address does not exist")
- ActiveMQAMQPNotFoundException sourceAddressDoesntExist();
-
- @Message(id = 219011, value = "source address not set")
- ActiveMQAMQPInvalidFieldException sourceAddressNotSet();
-
- @Message(id = 219012, value = "error rolling back coordinator: {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQAMQPIllegalStateException errorRollingbackCoordinator(String message);
-
- @Message(id = 219013, value = "error committing coordinator: {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQAMQPIllegalStateException errorCommittingCoordinator(String message);
-
- @Message(id = 219014, value = "Transaction not found: xid={0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQAMQPIllegalStateException txNotFound(String xidToString);
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java
deleted file mode 100644
index d52df40..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.sasl;
-
-import org.proton.plug.SASLResult;
-import org.proton.plug.ServerSASL;
-
-public class AnonymousServerSASL implements ServerSASL {
-
- public AnonymousServerSASL() {
- }
-
- @Override
- public String getName() {
- return "ANONYMOUS";
- }
-
- @Override
- public SASLResult processSASL(byte[] bytes) {
- return new PlainSASLResult(true, null, null);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java
deleted file mode 100644
index 59685ad..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.sasl;
-
-import org.proton.plug.ClientSASL;
-
-/**
- * This is a simple implementation provided with just user/password
- * TODO: this interface will probaby change as we are challenged with more SASL cases where there is a communication between client and server to determine the authentication
- */
-public class ClientSASLPlain implements ClientSASL {
-
- private String username;
- private String password;
-
- public ClientSASLPlain(String user, String password) {
- this.username = user;
- this.password = password;
- }
-
- @Override
- public String getName() {
- return "PLAIN";
- }
-
- @Override
- public byte[] getBytes() {
-
- if (username == null) {
- username = "";
- }
-
- if (password == null) {
- password = "";
- }
-
- byte[] usernameBytes = username.getBytes();
- byte[] passwordBytes = password.getBytes();
- byte[] data = new byte[usernameBytes.length + passwordBytes.length + 2];
- System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
- System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length);
- return data;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java
deleted file mode 100644
index fe33886..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.sasl;
-
-import org.proton.plug.SASLResult;
-
-public class PlainSASLResult implements SASLResult {
-
- private boolean success;
- private String user;
- private String password;
-
- public PlainSASLResult(boolean success, String user, String password) {
- this.success = success;
- this.user = user;
- this.password = password;
- }
-
- @Override
- public String getUser() {
- return user;
- }
-
- public String getPassword() {
- return password;
- }
-
- @Override
- public boolean isSuccess() {
- return success;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java
deleted file mode 100644
index 37c8dd3..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.sasl;
-
-import org.proton.plug.SASLResult;
-import org.proton.plug.ServerSASL;
-
-public class ServerSASLPlain implements ServerSASL {
-
- public static final String NAME = "PLAIN";
-
- @Override
- public String getName() {
- return NAME;
- }
-
- @Override
- public SASLResult processSASL(byte[] data) {
-
- String username = null;
- String password = null;
- String bytes = new String(data);
- String[] credentials = bytes.split(Character.toString((char) 0));
- int offSet = 0;
- if (credentials.length > 0) {
- if (credentials[0].length() == 0) {
- offSet = 1;
- }
-
- if (credentials.length >= offSet) {
- username = credentials[offSet];
- }
- if (credentials.length >= (offSet + 1)) {
- password = credentials[offSet + 1];
- }
- }
-
- boolean success = authenticate(username, password);
-
- return new PlainSASLResult(success, username, password);
- }
-
- /**
- * Hook for subclasses to perform the authentication here
- *
- * @param user
- * @param password
- */
- protected boolean authenticate(String user, String password) {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java
deleted file mode 100644
index e1e6944..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.util;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.UnpooledByteBufAllocator;
-import org.jboss.logging.Logger;
-
-public class ByteUtil {
-
- public static void debugFrame(Logger logger, String message, ByteBuf byteIn) {
- if (logger.isTraceEnabled()) {
- int location = byteIn.readerIndex();
- // debugging
- byte[] frame = new byte[byteIn.writerIndex()];
- byteIn.readBytes(frame);
-
- try {
- logger.trace(message + "\n" + ByteUtil.formatGroup(ByteUtil.bytesToHex(frame), 8, 16));
- }
- catch (Exception e) {
- logger.warn(e.getMessage(), e);
- }
-
- byteIn.readerIndex(location);
- }
- }
-
- public static String formatGroup(String str, int groupSize, int lineBreak) {
- StringBuffer buffer = new StringBuffer();
-
- int line = 1;
- buffer.append("/* 1 */ \"");
- for (int i = 0; i < str.length(); i += groupSize) {
- buffer.append(str.substring(i, i + Math.min(str.length() - i, groupSize)));
-
- if ((i + groupSize) % lineBreak == 0) {
- buffer.append("\" +\n/* ");
- line++;
- if (line < 10) {
- buffer.append(" ");
- }
- buffer.append(Integer.toString(line) + " */ \"");
- }
- else if ((i + groupSize) % groupSize == 0 && str.length() - i > groupSize) {
- buffer.append("\" + \"");
- }
- }
-
- buffer.append("\";");
-
- return buffer.toString();
-
- }
-
- protected static final char[] hexArray = "0123456789ABCDEF".toCharArray();
-
- public static String bytesToHex(byte[] bytes) {
- char[] hexChars = new char[bytes.length * 2];
- for (int j = 0; j < bytes.length; j++) {
- int v = bytes[j] & 0xFF;
- hexChars[j * 2] = hexArray[v >>> 4];
- hexChars[j * 2 + 1] = hexArray[v & 0x0F];
- }
- return new String(hexChars);
- }
-
- public static byte[] hexStringToByteArray(String s) {
- int len = s.length();
- byte[] data = new byte[len / 2];
- for (int i = 0; i < len; i += 2) {
- data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i + 1), 16));
- }
- return data;
- }
-
- public static byte[] longToBytes(long x) {
- ByteBuf buffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8);
- buffer.writeLong(x);
- return buffer.array();
- }
-
- public static String maxString(String value, int size) {
- if (value.length() < size) {
- return value;
- }
- else {
- return value.substring(0, size / 2) + " ... " + value.substring(value.length() - size / 2);
- }
- }
-
- public static String bytesToHex(byte[] bytes, int groupSize) {
- char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)];
- int outPos = 0;
- for (int j = 0; j < bytes.length; j++) {
- if (j > 0 && j % groupSize == 0) {
- hexChars[outPos++] = ' ';
- }
- int v = bytes[j] & 0xFF;
- hexChars[outPos++] = hexArray[v >>> 4];
- hexChars[outPos++] = hexArray[v & 0x0F];
- }
- return new String(hexChars);
- }
-
- private static int numberOfGroups(byte[] bytes, int groupSize) {
- int groups = bytes.length / groupSize;
-
- if (bytes.length % groupSize == 0) {
- groups--;
- }
-
- return groups;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java
deleted file mode 100644
index 014efb0..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.util;
-
-import org.apache.qpid.proton.codec.AMQPDefinedTypes;
-import org.apache.qpid.proton.codec.DecoderImpl;
-import org.apache.qpid.proton.codec.EncoderImpl;
-
-public class CodecCache {
-
- private static class EncoderDecoderPair {
-
- DecoderImpl decoder = new DecoderImpl();
- EncoderImpl encoder = new EncoderImpl(decoder);
-
- {
- AMQPDefinedTypes.registerAllTypes(decoder, encoder);
- }
- }
-
- private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() {
- @Override
- protected EncoderDecoderPair initialValue() {
- return new EncoderDecoderPair();
- }
- };
-
- public static DecoderImpl getDecoder() {
- return tlsCodec.get().decoder;
- }
-
- public static EncoderImpl getEncoder() {
- return tlsCodec.get().encoder;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java
deleted file mode 100644
index a175805..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.util;
-
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-
-public class CreditsSemaphore {
-
- @SuppressWarnings("serial")
- private static class Sync extends AbstractQueuedSynchronizer {
-
- private Sync(int initial) {
- setState(initial);
- }
-
- public int getCredits() {
- return getState();
- }
-
- @Override
- public int tryAcquireShared(final int numberOfAqcquires) {
- for (;;) {
- int actualSize = getState();
- int newValue = actualSize - numberOfAqcquires;
-
- if (newValue < 0) {
- if (actualSize == getState()) {
- return -1;
- }
- }
- else if (compareAndSetState(actualSize, newValue)) {
- return newValue;
- }
- }
- }
-
- @Override
- public boolean tryReleaseShared(final int numberOfReleases) {
- for (;;) {
- int actualSize = getState();
- int newValue = actualSize + numberOfReleases;
-
- if (compareAndSetState(actualSize, newValue)) {
- return true;
- }
-
- }
- }
-
- public void setCredits(final int credits) {
- for (;;) {
- int actualState = getState();
- if (compareAndSetState(actualState, credits)) {
- // This is to wake up any pending threads that could be waiting on queued
- releaseShared(0);
- return;
- }
- }
- }
- }
-
- private final Sync sync;
-
- public CreditsSemaphore(int initialCredits) {
- sync = new Sync(initialCredits);
- }
-
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
-
- public boolean tryAcquire() {
- return sync.tryAcquireShared(1) >= 0;
- }
-
- public void release() throws InterruptedException {
- sync.releaseShared(1);
- }
-
- public void release(int credits) throws InterruptedException {
- sync.releaseShared(credits);
- }
-
- public void setCredits(int credits) {
- sync.setCredits(credits);
- }
-
- public int getCredits() {
- return sync.getCredits();
- }
-
- public boolean hasQueuedThreads() {
- return sync.hasQueuedThreads();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java
deleted file mode 100644
index ae98891..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.util;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.impl.MessageImpl;
-
-public class DeliveryUtil {
-
- public static int readDelivery(Receiver receiver, ByteBuf buffer) {
- int initial = buffer.writerIndex();
- // optimization by norman
- int count;
- while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) {
- // Increment the writer index by the number of bytes written into it while calling recv.
- buffer.writerIndex(buffer.writerIndex() + count);
- buffer.ensureWritable(count);
- }
- return buffer.writerIndex() - initial;
- }
-
- public static MessageImpl decodeMessageImpl(ByteBuf buffer) {
- MessageImpl message = (MessageImpl) Message.Factory.create();
- message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
- return message;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java
deleted file mode 100644
index 20095ff..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.util;
-
-import java.util.concurrent.TimeUnit;
-
-public class FutureRunnable implements Runnable {
-
- private final ReusableLatch latch;
-
- public FutureRunnable(final int initialIterations) {
- latch = new ReusableLatch(initialIterations);
- }
-
- public FutureRunnable() {
- this(0);
- }
-
- @Override
- public void run() {
- latch.countDown();
- }
-
- public void countUp() {
- latch.countUp();
- }
-
- public void countDown() {
- latch.countDown();
- }
-
- public int getCount() {
- return latch.getCount();
- }
-
- public void await() throws InterruptedException {
- latch.await();
- }
-
- public boolean await(long timeWait, TimeUnit timeUnit) throws InterruptedException {
- return latch.await(timeWait, timeUnit);
- }
-
- public boolean await(long milliseconds) throws InterruptedException {
- return latch.await(milliseconds);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java
deleted file mode 100644
index 98afd30..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.util;
-
-import java.nio.ByteBuffer;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.qpid.proton.codec.WritableBuffer;
-
-/**
- * This is to use NettyBuffer within Proton
- */
-
-public class NettyWritable implements WritableBuffer {
-
- final ByteBuf nettyBuffer;
-
- public NettyWritable(ByteBuf nettyBuffer) {
- this.nettyBuffer = nettyBuffer;
- }
-
- @Override
- public void put(byte b) {
- nettyBuffer.writeByte(b);
- }
-
- @Override
- public void putFloat(float f) {
- nettyBuffer.writeFloat(f);
- }
-
- @Override
- public void putDouble(double d) {
- nettyBuffer.writeDouble(d);
- }
-
- @Override
- public void put(byte[] src, int offset, int length) {
- nettyBuffer.writeBytes(src, offset, length);
- }
-
- @Override
- public void putShort(short s) {
- nettyBuffer.writeShort(s);
- }
-
- @Override
- public void putInt(int i) {
- nettyBuffer.writeInt(i);
- }
-
- @Override
- public void putLong(long l) {
- nettyBuffer.writeLong(l);
- }
-
- @Override
- public boolean hasRemaining() {
- return nettyBuffer.writerIndex() < nettyBuffer.capacity();
- }
-
- @Override
- public int remaining() {
- return nettyBuffer.capacity() - nettyBuffer.writerIndex();
- }
-
- @Override
- public int position() {
- return nettyBuffer.writerIndex();
- }
-
- @Override
- public void position(int position) {
- nettyBuffer.writerIndex(position);
- }
-
- @Override
- public void put(ByteBuffer payload) {
- nettyBuffer.writeBytes(payload);
- }
-
- @Override
- public int limit() {
- return nettyBuffer.capacity();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java
deleted file mode 100644
index c3fab5d..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.util;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Footer;
-import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Properties;
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.codec.DecoderImpl;
-import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.message.MessageError;
-import org.apache.qpid.proton.message.ProtonJMessage;
-
-/**
- * This is a serverMessage that won't deal with the body
- */
-public class ProtonServerMessage implements ProtonJMessage {
-
- private Header header;
- private DeliveryAnnotations deliveryAnnotations;
- private MessageAnnotations messageAnnotations;
- private Properties properties;
- private ApplicationProperties applicationProperties;
-
- // This should include a raw body of both footer and body
- private byte[] rawBody;
-
- private Section parsedBody;
- private Footer parsedFooter;
-
- private final int EOF = 0;
-
- // TODO: Enumerations maybe?
- private static final int HEADER_TYPE = 0x070;
- private static final int DELIVERY_ANNOTATIONS = 0x071;
- private static final int MESSAGE_ANNOTATIONS = 0x072;
- private static final int PROPERTIES = 0x073;
- private static final int APPLICATION_PROPERTIES = 0x074;
-
- /**
- * This will decode a ByteBuffer tha represents the entire message.
- * Set the limits around the parameter.
- *
- * @param buffer a limited buffer for the message
- */
- public void decode(ByteBuffer buffer) {
-
- DecoderImpl decoder = CodecCache.getDecoder();
-
- header = null;
- deliveryAnnotations = null;
- messageAnnotations = null;
- properties = null;
- applicationProperties = null;
- rawBody = null;
-
- decoder.setByteBuffer(buffer);
- try {
- int type = readType(buffer, decoder);
- if (type == HEADER_TYPE) {
- header = (Header) readSection(buffer, decoder);
- type = readType(buffer, decoder);
-
- }
-
- if (type == DELIVERY_ANNOTATIONS) {
- deliveryAnnotations = (DeliveryAnnotations) readSection(buffer, decoder);
- type = readType(buffer, decoder);
-
- }
-
- if (type == MESSAGE_ANNOTATIONS) {
- messageAnnotations = (MessageAnnotations) readSection(buffer, decoder);
- type = readType(buffer, decoder);
- }
-
- if (type == PROPERTIES) {
- properties = (Properties) readSection(buffer, decoder);
- type = readType(buffer, decoder);
-
- }
-
- if (type == APPLICATION_PROPERTIES) {
- applicationProperties = (ApplicationProperties) readSection(buffer, decoder);
- type = readType(buffer, decoder);
- }
-
- if (type != EOF) {
- rawBody = new byte[buffer.limit() - buffer.position()];
- buffer.get(rawBody);
- }
- }
- finally {
- decoder.setByteBuffer(null);
- }
-
- }
-
- public void encode(ByteBuffer buffer) {
- WritableBuffer writableBuffer = new WritableBuffer.ByteBufferWrapper(buffer);
- encode(writableBuffer);
- }
-
- @Override
- public int encode(WritableBuffer writableBuffer) {
- final int firstPosition = writableBuffer.position();
-
- EncoderImpl encoder = CodecCache.getEncoder();
- encoder.setByteBuffer(writableBuffer);
-
- try {
- if (header != null) {
- encoder.writeObject(header);
- }
- if (deliveryAnnotations != null) {
- encoder.writeObject(deliveryAnnotations);
- }
- if (messageAnnotations != null) {
- encoder.writeObject(messageAnnotations);
- }
- if (properties != null) {
- encoder.writeObject(properties);
- }
- if (applicationProperties != null) {
- encoder.writeObject(applicationProperties);
- }
-
- // It should write either the parsed one or the rawBody
- if (parsedBody != null) {
- encoder.writeObject(parsedBody);
- if (parsedFooter != null) {
- encoder.writeObject(parsedFooter);
- }
- }
- else if (rawBody != null) {
- writableBuffer.put(rawBody, 0, rawBody.length);
- }
-
- return writableBuffer.position() - firstPosition;
- }
- finally {
- encoder.setByteBuffer((WritableBuffer) null);
- }
- }
-
- private int readType(ByteBuffer buffer, DecoderImpl decoder) {
-
- int pos = buffer.position();
-
- if (!buffer.hasRemaining()) {
- return EOF;
- }
- try {
- if (buffer.get() != 0) {
- return EOF;
- }
- else {
- return ((Number) decoder.readObject()).intValue();
- }
- }
- finally {
- buffer.position(pos);
- }
- }
-
- private Section readSection(ByteBuffer buffer, DecoderImpl decoder) {
- if (buffer.hasRemaining()) {
- return (Section) decoder.readObject();
- }
- else {
- return null;
- }
- }
-
- // At the moment we only need encode implemented!!!
- @Override
- public boolean isDurable() {
- return false;
- }
-
- @Override
- public long getDeliveryCount() {
- return 0;
- }
-
- @Override
- public short getPriority() {
- return 0;
- }
-
- @Override
- public boolean isFirstAcquirer() {
- return false;
- }
-
- @Override
- public long getTtl() {
- return 0;
- }
-
- @Override
- public void setDurable(boolean durable) {
-
- }
-
- @Override
- public void setTtl(long ttl) {
-
- }
-
- @Override
- public void setDeliveryCount(long deliveryCount) {
-
- }
-
- @Override
- public void setFirstAcquirer(boolean firstAcquirer) {
-
- }
-
- @Override
- public void setPriority(short priority) {
-
- }
-
- @Override
- public Object getMessageId() {
- return null;
- }
-
- @Override
- public long getGroupSequence() {
- return 0;
- }
-
- @Override
- public String getReplyToGroupId() {
- return null;
- }
-
- @Override
- public long getCreationTime() {
- return 0;
- }
-
- @Override
- public String getAddress() {
- return null;
- }
-
- @Override
- public byte[] getUserId() {
- return new byte[0];
- }
-
- @Override
- public String getReplyTo() {
- return null;
- }
-
- @Override
- public String getGroupId() {
- return null;
- }
-
- @Override
- public String getContentType() {
- return null;
- }
-
- @Override
- public long getExpiryTime() {
- return 0;
- }
-
- @Override
- public Object getCorrelationId() {
- return null;
- }
-
- @Override
- public String getContentEncoding() {
- return null;
- }
-
- @Override
- public String getSubject() {
- return null;
- }
-
- @Override
- public void setGroupSequence(long groupSequence) {
-
- }
-
- @Override
- public void setUserId(byte[] userId) {
-
- }
-
- @Override
- public void setCreationTime(long creationTime) {
-
- }
-
- @Override
- public void setSubject(String subject) {
-
- }
-
- @Override
- public void setGroupId(String groupId) {
-
- }
-
- @Override
- public void setAddress(String to) {
-
- }
-
- @Override
- public void setExpiryTime(long absoluteExpiryTime) {
-
- }
-
- @Override
- public void setReplyToGroupId(String replyToGroupId) {
-
- }
-
- @Override
- public void setContentEncoding(String contentEncoding) {
-
- }
-
- @Override
- public void setContentType(String contentType) {
-
- }
-
- @Override
- public void setReplyTo(String replyTo) {
-
- }
-
- @Override
- public void setCorrelationId(Object correlationId) {
-
- }
-
- @Override
- public void setMessageId(Object messageId) {
-
- }
-
- @Override
- public Header getHeader() {
- return null;
- }
-
- @Override
- public DeliveryAnnotations getDeliveryAnnotations() {
- return null;
- }
-
- @Override
- public MessageAnnotations getMessageAnnotations() {
- return null;
- }
-
- @Override
- public Properties getProperties() {
- return null;
- }
-
- @Override
- public ApplicationProperties getApplicationProperties() {
- return null;
- }
-
- @Override
- public Section getBody() {
- return null;
- }
-
- @Override
- public Footer getFooter() {
- return null;
- }
-
- @Override
- public void setHeader(Header header) {
-
- }
-
- @Override
- public void setDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations) {
-
- }
-
- @Override
- public void setMessageAnnotations(MessageAnnotations messageAnnotations) {
-
- }
-
- @Override
- public void setProperties(Properties properties) {
-
- }
-
- @Override
- public void setApplicationProperties(ApplicationProperties applicationProperties) {
-
- }
-
- @Override
- public void setBody(Section body) {
-
- }
-
- @Override
- public void setFooter(Footer footer) {
-
- }
-
- @Override
- public int decode(byte[] data, int offset, int length) {
- return 0;
- }
-
- @Override
- public int encode(byte[] data, int offset, int length) {
- return 0;
- }
-
- @Override
- public void clear() {
-
- }
-
- @Override
- public MessageError getError() {
- return null;
- }
-
- @Override
- public int encode2(byte[] data, int offset, int length) {
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java
deleted file mode 100644
index beccc03..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.util;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-
-/**
- * <p>This class will use the framework provided to by AbstractQueuedSynchronizer.</p>
- * <p>AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.</p>
- * <p>This class works just like CountDownLatch, with the difference you can also increase the counter</p>
- * <p>It could be used for sync points when one process is feeding the latch while another will wait when everything is done. (e.g. waiting IO completions to finish)</p>
- * <p>On ActiveMQ Artemis we have the requirement of increment and decrement a counter until the user fires a ready handler (commit). At that point we just act as a regular countDown.</p>
- * <p>Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.</p>
- * <p>For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.</p>
- */
-public class ReusableLatch {
-
- /**
- * Look at the doc and examples provided by AbstractQueuedSynchronizer for more information
- *
- * @see AbstractQueuedSynchronizer
- */
- @SuppressWarnings("serial")
- private static class CountSync extends AbstractQueuedSynchronizer {
-
- private CountSync(int count) {
- setState(count);
- }
-
- public int getCount() {
- return getState();
- }
-
- public void setCount(final int count) {
- setState(count);
- }
-
- @Override
- public int tryAcquireShared(final int numberOfAqcquires) {
- return getState() == 0 ? 1 : -1;
- }
-
- public void add() {
- for (;;) {
- int actualState = getState();
- int newState = actualState + 1;
- if (compareAndSetState(actualState, newState)) {
- return;
- }
- }
- }
-
- @Override
- public boolean tryReleaseShared(final int numberOfReleases) {
- for (;;) {
- int actualState = getState();
- if (actualState == 0) {
- return true;
- }
-
- int newState = actualState - numberOfReleases;
-
- if (newState < 0) {
- newState = 0;
- }
-
- if (compareAndSetState(actualState, newState)) {
- return newState == 0;
- }
- }
- }
- }
-
- private final CountSync control;
-
- public ReusableLatch() {
- this(0);
- }
-
- public ReusableLatch(final int count) {
- control = new CountSync(count);
- }
-
- public int getCount() {
- return control.getCount();
- }
-
- public void setCount(final int count) {
- control.setCount(count);
- }
-
- public void countUp() {
- control.add();
- }
-
- public void countDown() {
- control.releaseShared(1);
- }
-
- public void countDown(final int count) {
- control.releaseShared(count);
- }
-
- public void await() throws InterruptedException {
- control.acquireSharedInterruptibly(1);
- }
-
- public boolean await(final long milliseconds) throws InterruptedException {
- return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
- }
-
- public boolean await(final long timeWait, TimeUnit timeUnit) throws InterruptedException {
- return control.tryAcquireSharedNanos(1, timeUnit.toNanos(timeWait));
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
deleted file mode 100644
index 825b987..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context;
-
-import java.util.concurrent.Executors;
-
-import io.netty.buffer.ByteBuf;
-
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Session;
-import org.junit.Test;
-import org.proton.plug.AMQPConnectionCallback;
-import org.proton.plug.AMQPConnectionContext;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.SASLResult;
-import org.proton.plug.ServerSASL;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.handler.EventHandler;
-
-public class AbstractConnectionContextTest {
-
- @Test
- public void testListenerDoesntThrowNPEWhenClosingLinkWithNullContext() throws Exception {
- TestConnectionContext connectionContext = new TestConnectionContext(new TestConnectionCallback());
- EventHandler listener = connectionContext.getListener();
-
- Connection protonConnection = Connection.Factory.create();
- Session protonSession = protonConnection.session();
- Link link = protonSession.receiver("link");
-
- link.setContext(null);
-
- listener.onRemoteClose(link);
- }
-
- private class TestConnectionContext extends AbstractConnectionContext {
-
- private TestConnectionContext(AMQPConnectionCallback connectionCallback) {
- super(connectionCallback, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
- }
-
- @Override
- protected void remoteLinkOpened(Link link) throws Exception {
-
- }
-
- @Override
- protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
- return null;
- }
-
- public EventHandler getListener() {
- return listener;
- }
- }
-
- private class TestConnectionCallback implements AMQPConnectionCallback {
-
- @Override
- public void close() {
-
- }
-
- @Override
- public Binary newTransaction() {
- return null;
- }
-
- @Override
- public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
- return null;
- }
-
- @Override
- public void removeTransaction(Binary txid) {
-
- }
-
- @Override
- public void onTransport(ByteBuf bytes, AMQPConnectionContext connection) {
-
- }
-
- @Override
- public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
- return null;
- }
-
- @Override
- public void setConnection(AMQPConnectionContext connection) {
-
- }
-
- @Override
- public AMQPConnectionContext getConnection() {
- return null;
- }
-
- @Override
- public ServerSASL[] getSASLMechnisms() {
- return null;
- }
-
- @Override
- public boolean isSupportsAnonymous() {
- return true;
- }
-
- @Override
- public void sendSASLSupported() {
-
- }
-
- @Override
- public boolean validateConnection(Connection connection, SASLResult saslResult) {
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java
deleted file mode 100644
index 0046dd6..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import java.lang.ref.WeakReference;
-
-import org.apache.qpid.jms.JmsConnectionFactory;
-import org.jboss.logging.Logger;
-import org.proton.plug.test.minimalserver.DumbServer;
-import org.proton.plug.test.minimalserver.MinimalServer;
-
-public class AbstractJMSTest {
-
- private static final Logger log = Logger.getLogger(AbstractJMSTest.class);
-
- protected final boolean useSASL;
-
- protected String address = "exampleQueue";
- protected MinimalServer server = new MinimalServer();
-
- public AbstractJMSTest(boolean useSASL) {
- this.useSASL = useSASL;
- }
-
- public void tearDown() throws Exception {
- server.stop();
- DumbServer.clear();
- }
-
- public static void forceGC() {
- System.out.println("#test forceGC");
- WeakReference<Object> dumbReference = new WeakReference<>(new Object());
- // A loop that will wait GC, using the minimalserver time as possible
- while (dumbReference.get() != null) {
- System.gc();
- try {
- Thread.sleep(100);
- }
- catch (InterruptedException e) {
- }
- }
- System.out.println("#test forceGC Done");
- }
-
- protected Connection createConnection() throws JMSException {
- final ConnectionFactory factory = createConnectionFactory();
- final Connection connection = factory.createConnection();
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- log.warn(exception.getMessage(), exception);
- }
- });
- connection.start();
- return connection;
- }
-
- protected ConnectionFactory createConnectionFactory() {
- if (useSASL) {
- return new JmsConnectionFactory("aaaaaaaa", "aaaaaaa", "amqp://localhost:5672");
- }
- else {
- return new JmsConnectionFactory( "amqp://localhost:5672");
-
- }
- }
-
- protected Queue createQueue(Session session) throws Exception {
- return session.createQueue(address);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/Constants.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/Constants.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/Constants.java
deleted file mode 100644
index bacfd7b..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/Constants.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test;
-
-public class Constants {
-
- public static final int PORT = 5672;
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java
deleted file mode 100644
index 4c3aaf4..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.messaging.Properties;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.impl.MessageImpl;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.proton.plug.AMQPClientConnectionContext;
-import org.proton.plug.AMQPClientSenderContext;
-import org.proton.plug.AMQPClientSessionContext;
-import org.proton.plug.sasl.ClientSASLPlain;
-import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
-import org.proton.plug.test.minimalserver.DumbServer;
-import org.proton.plug.util.ByteUtil;
-
-/**
- * This is simulating a JMS client against a simple server
- * This is being effectively tested by {@link org.apache.activemq.artemis.tests.integration.proton.ProtonTest} with a proper framework in place.
- * This test eventually hungs on the testsuite.
- * While it is still valid for debugging, for that reason the test will be ignored.
- * and will be kept here for debug purposes.
- */
-@Ignore // remove this to debug it
-@RunWith(Parameterized.class)
-public class ProtonTest extends AbstractJMSTest {
-
- protected Connection connection;
-
- @Parameterized.Parameters(name = "sasl={0}")
- public static Collection<Object[]> data() {
- List<Object[]> list = Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
- System.out.println("Size = " + list.size());
- return list;
- }
-
- public ProtonTest(boolean useSASL) {
- super(useSASL);
- }
-
- @Before
- public void setUp() throws Exception {
- DumbServer.clear();
- AbstractJMSTest.forceGC();
- server.start("127.0.0.1", Constants.PORT, true);
- connection = createConnection();
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- }
-
- super.tearDown();
- }
-
- @Test
- public void testMessagesReceivedInParallel() throws Throwable {
- final int numMessages = getNumberOfMessages();
- long time = System.currentTimeMillis();
-
- final ArrayList<Throwable> exceptions = new ArrayList<>();
-
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- Connection connectionConsumer = null;
- try {
- connectionConsumer = createConnection();
- // connectionConsumer = connection;
- connectionConsumer.start();
- Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue queue = createQueue(sessionConsumer);
- final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
-
- int count = numMessages;
- while (count > 0) {
- try {
- BytesMessage m = (BytesMessage) consumer.receive(1000);
- if (count % 1000 == 0) {
- System.out.println("Count = " + count + ", property=" + m.getStringProperty("XX"));
- }
- Assert.assertNotNull("Could not receive message count=" + count + " on consumer", m);
- count--;
- }
- catch (JMSException e) {
- break;
- }
- }
- }
- catch (Throwable e) {
- exceptions.add(e);
- e.printStackTrace();
- }
- finally {
- try {
- // if the createconnecion wasn't commented out
- if (connectionConsumer != connection) {
- connectionConsumer.close();
- }
- }
- catch (Throwable ignored) {
- // NO OP
- }
- }
- }
- });
-
- Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
- t.start();
- final Queue queue = createQueue(session);
-
- MessageProducer p = session.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0; i < numMessages; i++) {
- BytesMessage message = session.createBytesMessage();
- // TODO: this will break stuff if I use a large number
- message.writeBytes(new byte[5]);
- message.setIntProperty("count", i);
- message.setStringProperty("XX", "count" + i);
- p.send(message);
- }
-
- long taken = (System.currentTimeMillis() - time);
- System.out.println("taken on send = " + taken + " sasl = " + useSASL);
- t.join();
-
- for (Throwable e : exceptions) {
- throw e;
- }
- taken = (System.currentTimeMillis() - time);
- System.out.println("taken = " + taken + " sasl = " + useSASL);
-
- connection.close();
- // assertEquals(0, q.getMessageCount());
- }
-
- @Test
- public void testSimpleCreateSessionAndClose() throws Throwable {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(address);
- Thread.sleep(1000);
- session.close();
- connection.close();
- }
-
- @Test
- public void testSimpleBinary() throws Throwable {
- final int numMessages = 5;
- long time = System.currentTimeMillis();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = createQueue(session);
-
- byte[] bytes = new byte[0xf + 1];
- for (int i = 0; i <= 0xf; i++) {
- bytes[i] = (byte) i;
- }
-
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < numMessages; i++) {
- BytesMessage message = session.createBytesMessage();
-
- message.writeBytes(bytes);
- message.setIntProperty("count", i);
- p.send(message);
- }
-
- session.close();
-
- Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
-
- for (int i = 0; i < numMessages; i++) {
- BytesMessage m = (BytesMessage) consumer.receive(5000);
-
- System.out.println("length " + m.getBodyLength());
- Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
-
- m.reset();
-
- long size = m.getBodyLength();
- byte[] bytesReceived = new byte[(int) size];
- m.readBytes(bytesReceived);
-
- System.out.println("Received " + ByteUtil.bytesToHex(bytesReceived, 1));
-
- Assert.assertArrayEquals(bytes, bytesReceived);
- }
-
- // assertEquals(0, q.getMessageCount());
- long taken = (System.currentTimeMillis() - time) / 1000;
- System.out.println("taken = " + taken);
- }
-
- @Test
- public void testMapMessage() throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = createQueue(session);
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < 10; i++) {
- MapMessage message = session.createMapMessage();
- message.setInt("x", i);
- message.setString("str", "str" + i);
- p.send(message);
- }
- MessageConsumer messageConsumer = session.createConsumer(queue);
- for (int i = 0; i < 10; i++) {
- MapMessage m = (MapMessage) messageConsumer.receive(5000);
- Assert.assertNotNull(m);
- Assert.assertEquals(i, m.getInt("x"));
- Assert.assertEquals("str" + i, m.getString("str"));
- }
-
- Assert.assertNull(messageConsumer.receiveNoWait());
- }
-
- @Test
- public void testProperties() throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = createQueue(session);
- MessageProducer p = session.createProducer(queue);
- TextMessage message = session.createTextMessage();
- message.setText("msg:0");
- message.setBooleanProperty("true", true);
- message.setBooleanProperty("false", false);
- message.setStringProperty("foo", "bar");
- message.setDoubleProperty("double", 66.6);
- message.setFloatProperty("float", 56.789f);
- message.setIntProperty("int", 8);
- message.setByteProperty("byte", (byte) 10);
- p.send(message);
- p.send(message);
- connection.start();
- MessageConsumer messageConsumer = session.createConsumer(queue);
- TextMessage m = (TextMessage) messageConsumer.receive(5000);
- Assert.assertNotNull(m);
- Assert.assertEquals("msg:0", m.getText());
- Assert.assertEquals(m.getBooleanProperty("true"), true);
- Assert.assertEquals(m.getBooleanProperty("false"), false);
- Assert.assertEquals(m.getStringProperty("foo"), "bar");
- Assert.assertEquals(m.getDoubleProperty("double"), 66.6, 0.0001);
- Assert.assertEquals(m.getFloatProperty("float"), 56.789f, 0.0001);
- Assert.assertEquals(m.getIntProperty("int"), 8);
- Assert.assertEquals(m.getByteProperty("byte"), (byte) 10);
- m = (TextMessage) messageConsumer.receive(5000);
- Assert.assertNotNull(m);
- connection.close();
- }
-
- // @Test
- public void testSendWithSimpleClient() throws Exception {
- SimpleAMQPConnector connector = new SimpleAMQPConnector();
- connector.start();
- AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
-
- clientConnection.clientOpen(new ClientSASLPlain("aa", "aa"));
-
- AMQPClientSessionContext session = clientConnection.createClientSession();
- AMQPClientSenderContext clientSender = session.createSender(address, true);
-
- Properties props = new Properties();
- for (int i = 0; i < 1; i++) {
- MessageImpl message = (MessageImpl) Message.Factory.create();
-
- HashMap map = new HashMap();
-
- map.put("i", i);
- AmqpValue value = new AmqpValue(map);
- message.setBody(value);
- message.setProperties(props);
- clientSender.send(message);
- }
-
- Session clientSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- connection.start();
-
- MessageConsumer consumer = clientSession.createConsumer(createQueue(clientSession));
- for (int i = 0; i < 1; i++) {
- MapMessage msg = (MapMessage) consumer.receive(5000);
- System.out.println("Msg " + msg);
- Assert.assertNotNull(msg);
-
- System.out.println("Receive message " + i);
-
- Assert.assertEquals(0, msg.getInt("i"));
- }
- }
-
- protected int getNumberOfMessages() {
- return 10000;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java
deleted file mode 100644
index 197c39e..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.invm;
-
-import java.util.concurrent.Executors;
-
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.proton.plug.AMQPClientConnectionContext;
-import org.proton.plug.context.client.ProtonClientConnectionContext;
-import org.proton.plug.test.minimalclient.Connector;
-
-/**
- * This is used for testing, where we bypass Netty or any networking for test conditions only
- */
-public class InVMTestConnector implements Connector {
-
- @Override
- public void start() {
-
- }
-
- @Override
- public AMQPClientConnectionContext connect(String host, int port) throws Exception {
- return new ProtonClientConnectionContext(new ProtonINVMSPI(), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
deleted file mode 100644
index a35e8ac..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.invm;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.engine.Connection;
-import org.jboss.logging.Logger;
-import org.proton.plug.AMQPConnectionContext;
-import org.proton.plug.AMQPConnectionCallback;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.SASLResult;
-import org.proton.plug.ServerSASL;
-import org.proton.plug.context.server.ProtonServerConnectionContext;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.sasl.AnonymousServerSASL;
-import org.proton.plug.sasl.ServerSASLPlain;
-import org.proton.plug.test.minimalserver.MinimalSessionSPI;
-import org.proton.plug.util.ByteUtil;
-
-public class ProtonINVMSPI implements AMQPConnectionCallback {
-
- private static final Logger log = Logger.getLogger(ProtonINVMSPI.class);
-
-
- AMQPConnectionContext returningConnection;
-
- ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
-
- final ExecutorService mainExecutor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
-
- final ExecutorService returningExecutor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
-
- public ProtonINVMSPI() {
- mainExecutor.execute(new Runnable() {
- @Override
- public void run() {
- Thread.currentThread().setName("MainExecutor-INVM");
- }
- });
- returningExecutor.execute(new Runnable() {
- @Override
- public void run() {
- Thread.currentThread().setName("ReturningExecutor-INVM");
- }
- });
- }
-
- @Override
- public void close() {
- mainExecutor.shutdown();
- }
-
- @Override
- public ServerSASL[] getSASLMechnisms() {
- return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
- }
-
- @Override
- public boolean isSupportsAnonymous() {
- return true;
- }
-
- @Override
- public void sendSASLSupported() {
-
- }
-
- @Override
- public boolean validateConnection(Connection connection, SASLResult saslResult) {
- return true;
- }
-
- @Override
- public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
- if (log.isTraceEnabled()) {
- ByteUtil.debugFrame(log, "InVM->", bytes);
- }
- final int size = bytes.writerIndex();
-
- bytes.retain();
- mainExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- if (log.isTraceEnabled()) {
- ByteUtil.debugFrame(log, "InVMDone->", bytes);
- }
- serverConnection.inputBuffer(bytes);
- try {
- connection.outputDone(size);
- }
- catch (Exception e) {
- log.warn(e.getMessage(), e);
- }
- }
- finally {
- bytes.release();
- }
- }
- });
- }
-
- @Override
- public void setConnection(AMQPConnectionContext connection) {
- returningConnection = connection;
- }
-
- @Override
- public AMQPConnectionContext getConnection() {
- return returningConnection;
- }
-
- @Override
- public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
- return null;
- }
-
- @Override
- public Binary newTransaction() {
- return null;
- }
-
- @Override
- public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
- return null;
- }
-
- @Override
- public void removeTransaction(Binary txid) {
-
- }
-
- class ReturnSPI implements AMQPConnectionCallback {
-
- @Override
- public void close() {
-
- }
-
- @Override
- public Binary newTransaction() {
- return null;
- }
-
- @Override
- public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
- return null;
- }
-
- @Override
- public void removeTransaction(Binary txid) {
-
- }
-
- @Override
- public ServerSASL[] getSASLMechnisms() {
- return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
- }
-
- @Override
- public boolean isSupportsAnonymous() {
- return false;
- }
-
- @Override
- public void sendSASLSupported() {
-
- }
-
- @Override
- public boolean validateConnection(Connection connection, SASLResult saslResult) {
- return true;
- }
-
- @Override
- public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
-
- final int size = bytes.writerIndex();
- ByteUtil.debugFrame(log, "InVM<-", bytes);
-
- bytes.retain();
- returningExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
-
- ByteUtil.debugFrame(log, "InVM done<-", bytes);
-
- returningConnection.inputBuffer(bytes);
- try {
- connection.outputDone(size);
- }
- catch (Exception e) {
- log.warn(e.getMessage(), e);
- }
-
- }
- finally {
- bytes.release();
- }
- }
- });
- }
-
- @Override
- public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
- return new MinimalSessionSPI();
- }
-
- @Override
- public void setConnection(AMQPConnectionContext connection) {
-
- }
-
- @Override
- public AMQPConnectionContext getConnection() {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
deleted file mode 100644
index 85e4c02..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalclient;
-
-import java.util.concurrent.TimeUnit;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.engine.Connection;
-import org.jboss.logging.Logger;
-import org.proton.plug.AMQPConnectionContext;
-import org.proton.plug.AMQPConnectionCallback;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.SASLResult;
-import org.proton.plug.ServerSASL;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.sasl.AnonymousServerSASL;
-import org.proton.plug.sasl.ServerSASLPlain;
-import org.proton.plug.util.ByteUtil;
-import org.proton.plug.util.ReusableLatch;
-
-public class AMQPClientSPI implements AMQPConnectionCallback {
-
- private static final Logger log = Logger.getLogger(AMQPClientSPI.class);
- final Channel channel;
- protected AMQPConnectionContext connection;
-
- public AMQPClientSPI(Channel channel) {
- this.channel = channel;
- }
-
- @Override
- public void setConnection(AMQPConnectionContext connection) {
- this.connection = connection;
- }
-
- @Override
- public AMQPConnectionContext getConnection() {
- return connection;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public ServerSASL[] getSASLMechnisms() {
- return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
- }
-
- @Override
- public boolean isSupportsAnonymous() {
- return true;
- }
-
- @Override
- public void sendSASLSupported() {
-
- }
-
- @Override
- public Binary newTransaction() {
- return null;
- }
-
- @Override
- public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
- return null;
- }
-
- @Override
- public void removeTransaction(Binary txid) {
-
- }
-
- @Override
- public boolean validateConnection(Connection connection, SASLResult saslResult) {
- return true;
- }
-
- final ReusableLatch latch = new ReusableLatch(0);
-
- @Override
- public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
- if (log.isTraceEnabled()) {
- ByteUtil.debugFrame(log, "Bytes leaving client", bytes);
- }
-
- final int bufferSize = bytes.writerIndex();
-
- latch.countUp();
-
- channel.writeAndFlush(bytes).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- //
- // connection.outputDone(bufferSize);
- latch.countDown();
- }
- });
-
- if (connection.isSyncOnFlush()) {
- try {
- if (!latch.await(5, TimeUnit.SECONDS)) {
- log.debug("Flush took longer than 5 seconds!!!");
- }
- }
- catch (Throwable e) {
- log.warn(e.getMessage(), e);
- }
- }
-
- connection.outputDone(bufferSize);
-
- }
-
- @Override
- public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java
deleted file mode 100644
index e6b67c2..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalclient;
-
-import org.proton.plug.AMQPClientConnectionContext;
-
-public interface Connector {
-
- void start();
-
- AMQPClientConnectionContext connect(String host, int port) throws Exception;
-}