You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/02 18:17:30 UTC
[3/4] activemq-artemis git commit: ARTEMIS-832 Openwire was ignoring
data syncs.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5e5ac0f0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java
new file mode 100644
index 0000000..c4c2214
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.persistence;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class SyncSendTest extends ActiveMQTestBase {
+
+ private static long totalRecordTime = -1;
+ private static final int RECORDS = 300;
+ private static final int MEASURE_RECORDS = 100;
+ private static final int WRAMP_UP = 100;
+
+ @Parameterized.Parameters(name = "storage={0}, protocol={1}")
+ public static Collection getParameters() {
+ Object[] storages = new Object[]{"libaio", "nio", "null"};
+ Object[] protocols = new Object[]{"core", "openwire", "amqp"};
+
+ ArrayList<Object[]> objects = new ArrayList<>();
+ for (Object s : storages) {
+ if (s.equals("libaio") && !LibaioContext.isLoaded()) {
+ continue;
+ }
+ for (Object p : protocols) {
+ objects.add(new Object[]{s, p});
+ }
+ }
+
+ return objects;
+ }
+
+ private final String storage;
+ private final String protocol;
+
+ public SyncSendTest(String storage, String protocol) {
+ this.storage = storage;
+ this.protocol = protocol;
+ }
+
+ ActiveMQServer server;
+ JMSServerManagerImpl jms;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ if (storage.equals("null")) {
+ server = createServer(false, true);
+ } else {
+ server = createServer(true, true);
+ }
+
+ jms = new JMSServerManagerImpl(server);
+
+ if (storage.equals("libaio")) {
+ server.getConfiguration().setJournalType(JournalType.ASYNCIO);
+ } else {
+ server.getConfiguration().setJournalType(JournalType.NIO);
+
+ }
+ jms.start();
+ }
+
+ private long getTimePerSync() throws Exception {
+
+ if (storage.equals("null")) {
+ return 0;
+ }
+ if (totalRecordTime < 0) {
+ File measureFile = temporaryFolder.newFile();
+
+ System.out.println("File::" + measureFile);
+
+ RandomAccessFile rfile = new RandomAccessFile(measureFile, "rw");
+ FileChannel channel = rfile.getChannel();
+
+ channel.position(0);
+
+ ByteBuffer buffer = ByteBuffer.allocate(10);
+ buffer.put(new byte[10]);
+ buffer.position(0);
+
+ Assert.assertEquals(10, channel.write(buffer));
+ channel.force(true);
+
+ long time = System.nanoTime();
+
+ for (int i = 0; i < MEASURE_RECORDS + WRAMP_UP; i++) {
+ if (i == WRAMP_UP) {
+ time = System.nanoTime();
+ }
+ channel.position(0);
+ buffer.position(0);
+ buffer.putInt(i);
+ buffer.position(0);
+ Assert.assertEquals(10, channel.write(buffer));
+ channel.force(false);
+ }
+
+ long timeEnd = System.nanoTime();
+
+ totalRecordTime = ((timeEnd - time) / MEASURE_RECORDS) * RECORDS;
+
+ System.out.println("total time = " + totalRecordTime);
+
+ }
+ return totalRecordTime;
+
+ }
+
+ @Test
+ public void testSendConsumeAudoACK() throws Exception {
+
+ long recordTime = getTimePerSync();
+
+ jms.createQueue(true, "queue", null, true, null);
+
+ ConnectionFactory factory = newCF();
+
+ Connection connection = factory.createConnection();
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue;
+ if (protocol.equals("amqp")) {
+ queue = session.createQueue("jms.queue.queue");
+ } else {
+ queue = session.createQueue("queue");
+ }
+ MessageProducer producer = session.createProducer(queue);
+
+ long start = System.nanoTime();
+
+ for (int i = 0; i < (RECORDS + WRAMP_UP); i++) {
+ if (i == WRAMP_UP) {
+ start = System.nanoTime(); // wramp up
+ }
+ producer.send(session.createMessage());
+ }
+
+ long end = System.nanoTime();
+
+ System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start));
+ System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime));
+
+ if ((end - start) < recordTime) {
+ Assert.fail("Messages are being sent too fast! Faster than the disk would be able to sync!");
+ }
+
+ connection.start();
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int i = 0; i < (RECORDS + WRAMP_UP); i++) {
+ if (i == WRAMP_UP) {
+ start = System.nanoTime(); // wramp up
+ }
+ Message msg = consumer.receive(5000);
+ Assert.assertNotNull(msg);
+ }
+
+ end = System.nanoTime();
+
+ System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start));
+ System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime));
+
+ // There's no way to sync on ack for AMQP
+ if (!protocol.equals("amqp") && (end - start) < recordTime) {
+ Assert.fail("Messages are being acked too fast! Faster than the disk would be able to sync!");
+ }
+ } finally {
+ connection.close();
+ }
+
+ }
+
+ // this will set ack as synchronous, to make sure we make proper measures against the sync on disk
+ private ConnectionFactory newCF() {
+ if (protocol.equals("core")) {
+ ConnectionFactory factory = new ActiveMQConnectionFactory();
+ ((ActiveMQConnectionFactory) factory).setBlockOnAcknowledge(true);
+ return factory;
+ } else if (protocol.equals("amqp")) {
+ final JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+ factory.setForceAsyncAcks(true);
+ return factory;
+ } else {
+ org.apache.activemq.ActiveMQConnectionFactory cf = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true");
+ cf.setSendAcksAsync(false);
+ return cf;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5e5ac0f0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java
index 84a3ecc..c445a86 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.integration.vertx.VertxOutgoingConnectorServi
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
@@ -48,8 +49,10 @@ import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
/**
* This class tests the basics of ActiveMQ
- * vertx integration
+ * vertx inte
+ * gration
*/
+@Ignore
public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
private PlatformManager vertxManager;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5e5ac0f0/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
index d0676ee..0316945 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
@@ -60,6 +60,16 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
}
@Override
+ public SequentialFileFactory setDatasync(boolean enabled) {
+ return null;
+ }
+
+ @Override
+ public boolean isDatasync() {
+ return false;
+ }
+
+ @Override
public int getMaxIO() {
return 1;
}