You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by ja...@apache.org on 2014/02/25 14:45:54 UTC
svn commit: r1571687 - in /ace/trunk/org.apache.ace.log:
src/org/apache/ace/log/server/store/impl/
test/org/apache/ace/log/server/store/impl/
Author: jawi
Date: Tue Feb 25 13:45:54 2014
New Revision: 1571687
URL: http://svn.apache.org/r1571687
Log:
ACE-461 - added additional concurrency tests for LogStoreImpl:
- also fixed Java7-style generics.
Added:
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java (with props)
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
Modified: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java?rev=1571687&r1=1571686&r2=1571687&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java (original)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java Tue Feb 25 13:45:54 2014
@@ -36,7 +36,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
import org.apache.ace.feedback.Descriptor;
import org.apache.ace.feedback.Event;
@@ -79,8 +79,8 @@ public class LogStoreImpl implements Log
}
public List<Event> get(Descriptor descriptor) throws IOException {
+ obtainLock(descriptor.getTargetID(), descriptor.getStoreID());
try {
- obtainLock(descriptor.getTargetID(), descriptor.getStoreID());
return getInternal(descriptor);
}
finally {
@@ -278,7 +278,7 @@ public class LogStoreImpl implements Log
high = Long.MAX_VALUE;
}
// send (eventadmin)event about a new (log)event being stored
- Dictionary props = new Hashtable();
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(LogStore.EVENT_PROP_LOGNAME, m_name);
props.put(LogStore.EVENT_PROP_LOG_EVENT, event);
m_eventAdmin.postEvent(new org.osgi.service.event.Event(LogStore.EVENT_TOPIC, props));
@@ -419,8 +419,8 @@ public class LogStoreImpl implements Log
}
private void clean(String targetID, Long logID) throws IOException {
+ obtainLock(targetID, logID);
try {
- obtainLock(targetID, logID);
List<Event> events = getInternal(new Descriptor(targetID, logID, SortedRangeSet.FULL_SET));
if (events.size() > m_maxEvents) {
for (int i = 0; i < m_maxEvents; i++) {
@@ -454,15 +454,10 @@ public class LogStoreImpl implements Log
// try to obtain the lock if we could not lock it on the first try
if (alreadyLocked) {
- int nrOfTries = 1;
- while (alreadyLocked && nrOfTries < 10000) {
- try {
- Thread.sleep(1);
- }
- catch (InterruptedException e) {
- break;
- }
- nrOfTries++;
+ int nrOfTries = 0;
+ while (alreadyLocked && nrOfTries++ < 10000) {
+ LockSupport.parkNanos(50);
+
synchronized (lockedLogs) {
alreadyLocked = lockedLogs.contains(logID);
if (!alreadyLocked) {
Added: ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java?rev=1571687&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java (added)
+++ ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java Tue Feb 25 13:45:54 2014
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.server.store.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ace.feedback.Descriptor;
+import org.apache.ace.feedback.Event;
+import org.apache.ace.range.RangeIterator;
+import org.apache.ace.range.SortedRangeSet;
+import org.apache.ace.test.utils.TestUtils;
+import org.osgi.service.event.EventAdmin;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test cases for {@link LogStoreImpl}.
+ */
+public class LogStoreImplConcurrencyTest {
+ private static final String TARGET_ID = "targetId";
+ private static final long STORE_ID = 12345;
+
+ private static class Reader implements Runnable {
+ private final String m_name;
+ private final CountDownLatch m_start;
+ private final CountDownLatch m_stop;
+ private final LogStoreImpl m_store;
+ private final ConcurrentMap<Long, Boolean> m_seen = new ConcurrentHashMap<Long, Boolean>();
+ private final int m_count;
+
+ public Reader(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int count) {
+ this(store, start, stop, count, 0);
+ }
+
+ public Reader(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int count, int initial) {
+ m_name = "Reader-" + initial;
+ m_store = store;
+ m_start = start;
+ m_stop = stop;
+ m_count = count;
+ }
+
+ @Override
+ public void run() {
+ Random rnd = new Random();
+
+ try {
+ m_start.await();
+
+ System.out.printf("Reader (%s) starting to read %d records...%n", m_name, m_count);
+
+ while (m_seen.size() < m_count) {
+ try {
+ if (rnd.nextInt(1000) >= 995) {
+ // perform a random cleanup...
+ m_store.clean();
+ }
+ List<Descriptor> descriptors = m_store.getDescriptors(TARGET_ID);
+ for (Descriptor desc : descriptors) {
+ SortedRangeSet rangeSet = desc.getRangeSet();
+ RangeIterator rangeIter = rangeSet.iterator();
+ while (rangeIter.hasNext()) {
+ m_seen.putIfAbsent(Long.valueOf(rangeIter.next()), Boolean.TRUE);
+ }
+ }
+ }
+ catch (IOException e) {
+ System.out.printf("I/O exception (%s) caught: %s in %s.%n", e.getClass().getSimpleName(), e.getMessage(), getCaller(e));
+ }
+ }
+
+ System.out.printf("Reader (%s) finished with %d records read...%n", m_name, m_seen.size());
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ m_stop.countDown();
+
+ System.out.println("Ending reader (" + m_name + ")");
+ }
+ }
+ }
+
+ private static class Writer implements Runnable {
+ private final String m_name;
+ private final CountDownLatch m_start;
+ private final CountDownLatch m_stop;
+ private final LogStoreImpl m_store;
+ private final ConcurrentMap<Long, Event> m_written = new ConcurrentHashMap<Long, Event>();
+ private final int m_count;
+ private final int m_initValue;
+ private final int m_stepSize;
+
+ public Writer(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int count) {
+ this(store, start, stop, count, 0, 1);
+ }
+
+ public Writer(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int count, int initial, int stepSize) {
+ m_name = "Writer-" + initial;
+ m_store = store;
+ m_start = start;
+ m_stop = stop;
+ m_count = count;
+ m_initValue = initial;
+ m_stepSize = stepSize;
+ }
+
+ @Override
+ public void run() {
+ Random rnd = new Random();
+
+ try {
+ m_start.await();
+
+ System.out.printf("Writer (%s) starts writing %d records...%n", m_name, m_count);
+
+ for (int i = m_initValue; i < m_count; i += m_stepSize) {
+ long id = i;
+ Event event = new Event(TARGET_ID, STORE_ID, id, id, rnd.nextInt(10));
+
+ m_store.put(Arrays.asList(event));
+ m_written.putIfAbsent(Long.valueOf(id), event);
+ }
+
+ System.out.printf("Writer (%s) finished with %d records written...%n", m_name, m_written.size());
+ }
+ catch (InterruptedException e) {
+ // ok, stop...
+ }
+ catch (IOException exception) {
+ exception.printStackTrace();
+ }
+ finally {
+ m_stop.countDown();
+
+ System.out.println("Ending writer (" + m_name + ")");
+ }
+ }
+ }
+
+ private static String getCaller(Exception e) {
+ StringBuilder sb = new StringBuilder();
+ StackTraceElement[] st = e.getStackTrace();
+ int n = Math.min(st.length, 1);
+ int m = Math.min(st.length, 4);
+ for (int i = n; i < m; i++) {
+ if (i > n) {
+ sb.append(" -> ");
+ }
+ StackTraceElement ste = st[i];
+ sb.append(ste.getClassName()).append(".").append(ste.getMethodName()).append("(").append(ste.getLineNumber()).append(")");
+ }
+ return sb.toString();
+ }
+
+ private File m_baseDir;
+ private ExecutorService m_executor;
+ private CompletionService<Boolean> m_completionService;
+
+ /**
+ * Tests that concurrent use of a {@link LogStoreImpl} with multiple readers and multiple writers works as expected.
+ */
+ @Test(enabled = false)
+ public void testConcurrentUseMultipleReaderAndMultipleWriters() throws Exception {
+ File storeFile = File.createTempFile("feedback", ".store");
+ storeFile.deleteOnExit();
+
+ final int recordCount = 10000;
+ final int readerCount = 3; // Runtime.getRuntime().availableProcessors() + 1;
+ final int writerCount = 3; // Runtime.getRuntime().availableProcessors() + 1;
+
+ final LogStoreImpl store = createLogStore();
+
+ final CountDownLatch start = new CountDownLatch(1);
+ final CountDownLatch stop = new CountDownLatch(writerCount + readerCount);
+
+ Writer[] writers = new Writer[writerCount];
+ for (int i = 0; i < writerCount; i++) {
+ writers[i] = new Writer(store, start, stop, recordCount, i, writerCount);
+ }
+
+ Reader[] readers = new Reader[readerCount];
+ for (int i = 0; i < readerCount; i++) {
+ readers[i] = new Reader(store, start, stop, recordCount, i);
+ }
+
+ // gents, start your engines...
+ for (int i = 0; i < readers.length; i++) {
+ m_completionService.submit(readers[i], Boolean.TRUE);
+ }
+ for (int i = 0; i < writers.length; i++) {
+ m_completionService.submit(writers[i], Boolean.TRUE);
+ }
+
+ // 3, 2, 1... GO...
+ start.countDown();
+
+ // waiting for all threads to finish...
+ for (int i = 0, r = 0; r < 10 && i < writerCount + readerCount; i++) {
+ Future<Boolean> future = m_completionService.poll(1, TimeUnit.MINUTES);
+ if (future == null) {
+ r++;
+ }
+ }
+ assertTrue(stop.await(5, TimeUnit.SECONDS));
+
+ int readCount = 0;
+ for (int i = 0; i < readers.length; i++) {
+ readCount += readers[i].m_seen.size();
+ }
+ int writtenCount = 0;
+ for (int i = 0; i < writers.length; i++) {
+ writtenCount += writers[i].m_written.size();
+ }
+
+ assertEquals(recordCount, writtenCount, "Not all records were written?");
+ // All readers read the exact same data, so we've got N copies of it...
+ assertEquals(readCount, readerCount * writtenCount, "Not all records were seen?");
+
+ verifyStoreContents(store, recordCount, writers);
+ }
+
+ /**
+ * Tests that concurrent use of a {@link LogStoreImpl} with a single reader and multiple writers works as expected.
+ */
+ @Test(enabled = false)
+ public void testConcurrentUseSingleReaderAndMultipleWriters() throws Exception {
+ File storeFile = File.createTempFile("feedback", ".store");
+ storeFile.deleteOnExit();
+
+ final int recordCount = 10000;
+ final int writerCount = 3; // Runtime.getRuntime().availableProcessors() + 1;
+
+ final LogStoreImpl store = createLogStore();
+
+ final CountDownLatch start = new CountDownLatch(1);
+ final CountDownLatch stop = new CountDownLatch(writerCount + 1);
+
+ Writer[] writers = new Writer[writerCount];
+
+ for (int i = 0; i < writerCount; i++) {
+ writers[i] = new Writer(store, start, stop, recordCount, i, writerCount);
+ }
+
+ Reader reader = new Reader(store, start, stop, recordCount);
+
+ // gents, start your engines...
+ m_completionService.submit(reader, Boolean.TRUE);
+ for (int i = 0; i < writers.length; i++) {
+ m_completionService.submit(writers[i], Boolean.TRUE);
+ }
+
+ // 3, 2, 1... GO...
+ start.countDown();
+
+ // waiting for all threads to finish...
+ for (int i = 0, r = 0; r < 10 && i < writerCount + 1; i++) {
+ Future<Boolean> future = m_completionService.poll(1, TimeUnit.MINUTES);
+ if (future == null) {
+ r++;
+ }
+ }
+ assertTrue(stop.await(5, TimeUnit.SECONDS));
+
+ int writtenCount = 0;
+ for (int i = 0; i < writers.length; i++) {
+ writtenCount += writers[i].m_written.size();
+ }
+
+ int readCount = reader.m_seen.size();
+
+ assertEquals(recordCount, writtenCount, "Not all records were written?");
+ assertEquals(readCount, writtenCount, "Not all records were seen?");
+
+ verifyStoreContents(store, recordCount, writers);
+ }
+
+ /**
+ * Tests that concurrent use of a {@link LogStoreImpl} with a single reader and writer works as expected.
+ */
+ @Test
+ public void testConcurrentUseSingleReaderAndSingleWriter() throws Exception {
+ File storeFile = File.createTempFile("feedback", ".store");
+ storeFile.deleteOnExit();
+
+ final int recordCount = 10000;
+
+ final LogStoreImpl store = createLogStore();
+
+ final CountDownLatch start = new CountDownLatch(1);
+ final CountDownLatch stop = new CountDownLatch(2);
+
+ Writer writer = new Writer(store, start, stop, recordCount);
+ Reader reader = new Reader(store, start, stop, recordCount);
+
+ // gents, start your engines...
+ m_completionService.submit(writer, Boolean.TRUE);
+ m_completionService.submit(reader, Boolean.TRUE);
+
+ // 3, 2, 1... GO...
+ start.countDown();
+
+ // waiting both threads to finish...
+ assertTrue(stop.await(120, TimeUnit.SECONDS));
+
+ int writeCount = writer.m_written.size();
+ int readCount = reader.m_seen.size();
+
+ assertEquals(recordCount, writeCount, "Not all records were written?");
+ assertEquals(readCount, writeCount, "Not all records were seen?");
+
+ verifyStoreContents(store, recordCount, writer);
+ }
+
+ @Test
+ public void testTimedWrite() throws Exception {
+ File storeFile = File.createTempFile("feedback", ".store");
+ storeFile.deleteOnExit();
+
+ final int recordCount = 10000;
+
+ final LogStoreImpl store = createLogStore();
+
+ long start = System.nanoTime();
+ for (int i = 0; i < recordCount; i++) {
+ store.put(Arrays.asList(new Event("1,2,3,4,5")));
+ }
+ long end = System.nanoTime();
+ System.out.printf("Writing %d records took %.3f ms.%n", recordCount, (end - start) / 1.0e6);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ protected void setUp() throws Exception {
+ m_baseDir = File.createTempFile("logstore", "txt");
+ m_baseDir.delete();
+ m_baseDir.mkdirs();
+
+ m_executor = Executors.newCachedThreadPool();
+ m_completionService = new ExecutorCompletionService<Boolean>(m_executor);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ protected void tearDown() throws InterruptedException {
+ m_executor.shutdownNow();
+ m_executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ private LogStoreImpl createLogStore() throws Exception {
+ LogStoreImpl logStore = new LogStoreImpl(m_baseDir, "log");
+ TestUtils.configureObject(logStore, EventAdmin.class);
+ logStore.start();
+ return logStore;
+ }
+
+ private void verifyStoreContents(final LogStoreImpl store, final int count, Writer... writers) throws IOException {
+ // Verify the written file...
+ List<Descriptor> descriptors = store.getDescriptors();
+
+ long expectedID = 0;
+ for (Descriptor desc : descriptors) {
+ SortedRangeSet rangeSet = desc.getRangeSet();
+ RangeIterator rangeIter = rangeSet.iterator();
+
+ while (rangeIter.hasNext()) {
+ long id = rangeIter.next();
+
+ Event expectedEntry = null;
+ for (int i = 0; (expectedEntry == null) && i < writers.length; i++) {
+ expectedEntry = writers[i].m_written.remove(id);
+ }
+ assertNotNull(expectedEntry, "Event ID #" + id + " never written?!");
+ // Test continuation of written data...
+ assertEquals(expectedEntry.getID(), expectedID++, "Entry ID mismatch?!");
+ }
+ }
+ }
+}
Propchange: ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java?rev=1571687&r1=1571686&r2=1571687&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java (original)
+++ ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java Tue Feb 25 13:45:54 2014
@@ -218,7 +218,7 @@ public class ServerLogStoreTester {
es.execute(new Runnable() {
@Override
public void run() {
- List<Event> list = new ArrayList<>();
+ List<Event> list = new ArrayList<Event>();
list.add(new Event(t, l, i, System.currentTimeMillis(), AuditEvent.FRAMEWORK_STARTED, props));
try {
m_logStore.put(list);