You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/02 18:17:30 UTC

[3/4] activemq-artemis git commit: ARTEMIS-832 Openwire was ignoring data syncs.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5e5ac0f0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java
new file mode 100644
index 0000000..c4c2214
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.persistence;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class SyncSendTest extends ActiveMQTestBase {
+
+   private static long totalRecordTime = -1;
+   private static final int RECORDS = 300;
+   private static final int MEASURE_RECORDS = 100;
+   private static final int WRAMP_UP = 100;
+
+   @Parameterized.Parameters(name = "storage={0}, protocol={1}")
+   public static Collection getParameters() {
+      Object[] storages = new Object[]{"libaio", "nio", "null"};
+      Object[] protocols = new Object[]{"core", "openwire", "amqp"};
+
+      ArrayList<Object[]> objects = new ArrayList<>();
+      for (Object s : storages) {
+         if (s.equals("libaio") && !LibaioContext.isLoaded()) {
+            continue;
+         }
+         for (Object p : protocols) {
+            objects.add(new Object[]{s, p});
+         }
+      }
+
+      return objects;
+   }
+
+   private final String storage;
+   private final String protocol;
+
+   public SyncSendTest(String storage, String protocol) {
+      this.storage = storage;
+      this.protocol = protocol;
+   }
+
+   ActiveMQServer server;
+   JMSServerManagerImpl jms;
+
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      if (storage.equals("null")) {
+         server = createServer(false, true);
+      } else {
+         server = createServer(true, true);
+      }
+
+      jms = new JMSServerManagerImpl(server);
+
+      if (storage.equals("libaio")) {
+         server.getConfiguration().setJournalType(JournalType.ASYNCIO);
+      } else {
+         server.getConfiguration().setJournalType(JournalType.NIO);
+
+      }
+      jms.start();
+   }
+
+   private long getTimePerSync() throws Exception {
+
+      if (storage.equals("null")) {
+         return 0;
+      }
+      if (totalRecordTime < 0) {
+         File measureFile = temporaryFolder.newFile();
+
+         System.out.println("File::" + measureFile);
+
+         RandomAccessFile rfile = new RandomAccessFile(measureFile, "rw");
+         FileChannel channel = rfile.getChannel();
+
+         channel.position(0);
+
+         ByteBuffer buffer = ByteBuffer.allocate(10);
+         buffer.put(new byte[10]);
+         buffer.position(0);
+
+         Assert.assertEquals(10, channel.write(buffer));
+         channel.force(true);
+
+         long time = System.nanoTime();
+
+         for (int i = 0; i < MEASURE_RECORDS + WRAMP_UP; i++) {
+            if (i == WRAMP_UP) {
+               time = System.nanoTime();
+            }
+            channel.position(0);
+            buffer.position(0);
+            buffer.putInt(i);
+            buffer.position(0);
+            Assert.assertEquals(10, channel.write(buffer));
+            channel.force(false);
+         }
+
+         long timeEnd = System.nanoTime();
+
+         totalRecordTime = ((timeEnd - time) / MEASURE_RECORDS) * RECORDS;
+
+         System.out.println("total time = " + totalRecordTime);
+
+      }
+      return totalRecordTime;
+
+   }
+
+   @Test
+   public void testSendConsumeAudoACK() throws Exception {
+
+      long recordTime = getTimePerSync();
+
+      jms.createQueue(true, "queue", null, true, null);
+
+      ConnectionFactory factory = newCF();
+
+      Connection connection = factory.createConnection();
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         Queue queue;
+         if (protocol.equals("amqp")) {
+            queue = session.createQueue("jms.queue.queue");
+         } else {
+            queue = session.createQueue("queue");
+         }
+         MessageProducer producer = session.createProducer(queue);
+
+         long start = System.nanoTime();
+
+         for (int i = 0; i < (RECORDS + WRAMP_UP); i++) {
+            if (i == WRAMP_UP) {
+               start = System.nanoTime(); // wramp up
+            }
+            producer.send(session.createMessage());
+         }
+
+         long end = System.nanoTime();
+
+         System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start));
+         System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime));
+
+         if ((end - start) < recordTime) {
+            Assert.fail("Messages are being sent too fast! Faster than the disk would be able to sync!");
+         }
+
+         connection.start();
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         for (int i = 0; i < (RECORDS + WRAMP_UP); i++) {
+            if (i == WRAMP_UP) {
+               start = System.nanoTime(); // wramp up
+            }
+            Message msg = consumer.receive(5000);
+            Assert.assertNotNull(msg);
+         }
+
+         end = System.nanoTime();
+
+         System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start));
+         System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime));
+
+         // There's no way to sync on ack for AMQP
+         if (!protocol.equals("amqp") && (end - start) < recordTime) {
+            Assert.fail("Messages are being acked too fast! Faster than the disk would be able to sync!");
+         }
+      } finally {
+         connection.close();
+      }
+
+   }
+
+   // this will set ack as synchronous, to make sure we make proper measures against the sync on disk
+   private ConnectionFactory newCF() {
+      if (protocol.equals("core")) {
+         ConnectionFactory factory = new ActiveMQConnectionFactory();
+         ((ActiveMQConnectionFactory) factory).setBlockOnAcknowledge(true);
+         return factory;
+      } else if (protocol.equals("amqp")) {
+         final JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+         factory.setForceAsyncAcks(true);
+         return factory;
+      } else {
+         org.apache.activemq.ActiveMQConnectionFactory cf = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true");
+         cf.setSendAcksAsync(false);
+         return cf;
+      }
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5e5ac0f0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java
index 84a3ecc..c445a86 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.integration.vertx.VertxOutgoingConnectorServi
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.vertx.java.core.Handler;
 import org.vertx.java.core.Vertx;
@@ -48,8 +49,10 @@ import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
 
 /**
  * This class tests the basics of ActiveMQ
- * vertx integration
+ * vertx inte
+ * gration
  */
+@Ignore
 public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
 
    private PlatformManager vertxManager;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5e5ac0f0/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
index d0676ee..0316945 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
@@ -60,6 +60,16 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
    }
 
    @Override
+   public SequentialFileFactory setDatasync(boolean enabled) {
+      return null;
+   }
+
+   @Override
+   public boolean isDatasync() {
+      return false;
+   }
+
+   @Override
    public int getMaxIO() {
       return 1;
    }