You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by ma...@apache.org on 2014/02/25 11:46:47 UTC
svn commit: r1571642 - in /ace/trunk/org.apache.ace.log:
src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
Author: marrs
Date: Tue Feb 25 10:46:47 2014
New Revision: 1571642
URL: http://svn.apache.org/r1571642
Log:
ACE-461 Committed a fix (and did some reformatting), plus a test case.
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
Modified: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java?rev=1571642&r1=1571641&r2=1571642&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java (original)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java Tue Feb 25 10:46:47 2014
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import org.apache.ace.feedback.Descriptor;
import org.apache.ace.feedback.Event;
@@ -77,11 +78,7 @@ public class LogStoreImpl implements Log
}
}
- /**
- * @see org.apache.ace.log.server.store.LogStore#get(org.apache.ace.log.Descriptor)
- */
- public List<Event> get(Descriptor descriptor)
- throws IOException {
+ public List<Event> get(Descriptor descriptor) throws IOException {
try {
obtainLock(descriptor.getTargetID(), descriptor.getStoreID());
return getInternal(descriptor);
@@ -106,17 +103,14 @@ public class LogStoreImpl implements Log
final SortedRangeSet set = descriptor.getRangeSet();
BufferedReader in = null;
try {
- File log = new File(new File(m_dir,
- targetIDToFilename(descriptor.getTargetID())),
- String.valueOf(descriptor.getStoreID()));
+ File log = new File(new File(m_dir, targetIDToFilename(descriptor.getTargetID())), String.valueOf(descriptor.getStoreID()));
if (!log.isFile()) {
return result;
}
in = new BufferedReader(new FileReader(log));
String file = log.getAbsolutePath();
long counter = 0;
- for (String line = in.readLine(); line != null; line = in
- .readLine()) {
+ for (String line = in.readLine(); line != null; line = in.readLine()) {
Event event = new Event(line);
long id = event.getID();
if ((counter != -1) && ++counter == id) {
@@ -149,21 +143,13 @@ public class LogStoreImpl implements Log
return result;
}
- /**
- * @see org.apache.ace.log.server.store.LogStore#getDescriptor(String, long)
- */
- public Descriptor getDescriptor(String targetID, long logID)
- throws IOException {
- Long high = m_fileToID.get(new File(new File(m_dir,
- targetIDToFilename(targetID)), String.valueOf(logID))
- .getAbsolutePath());
+ public Descriptor getDescriptor(String targetID, long logID) throws IOException {
+ Long high = m_fileToID.get(new File(new File(m_dir, targetIDToFilename(targetID)), String.valueOf(logID)).getAbsolutePath());
if (high != null) {
Range r = new Range(1, high);
- return new Descriptor(targetID, logID, new SortedRangeSet(
- r.toRepresentation()));
+ return new Descriptor(targetID, logID, new SortedRangeSet(r.toRepresentation()));
}
- List<Event> events = get(new Descriptor(targetID, logID,
- SortedRangeSet.FULL_SET));
+ List<Event> events = get(new Descriptor(targetID, logID, SortedRangeSet.FULL_SET));
long[] idsArray = new long[events.size()];
int i = 0;
@@ -173,11 +159,7 @@ public class LogStoreImpl implements Log
return new Descriptor(targetID, logID, new SortedRangeSet(idsArray));
}
- /**
- * @see org.apache.ace.log.server.store.LogStore#getDescriptors(String)
- */
- public List<Descriptor> getDescriptors(String targetID)
- throws IOException {
+ public List<Descriptor> getDescriptors(String targetID) throws IOException {
File dir = new File(m_dir, targetIDToFilename(targetID));
List<Descriptor> result = new ArrayList<Descriptor>();
if (!dir.isDirectory()) {
@@ -191,9 +173,6 @@ public class LogStoreImpl implements Log
return result;
}
- /**
- * @see org.apache.ace.log.server.store.LogStore#getDescriptors()
- */
public List<Descriptor> getDescriptors() throws IOException {
List<Descriptor> result = new ArrayList<Descriptor>();
for (String name : notNull(m_dir.list())) {
@@ -202,15 +181,12 @@ public class LogStoreImpl implements Log
return result;
}
- /**
- * @see org.apache.ace.log.server.store.LogStore#put(java.util.List)
- */
public void put(List<Event> events) throws IOException {
Map<String, Map<Long, List<Event>>> sorted = sort(events);
for (String targetID : sorted.keySet()) {
for (Long logID : sorted.get(targetID).keySet()) {
+ obtainLock(targetID, logID);
try {
- obtainLock(targetID, logID);
put(targetID, logID, sorted.get(targetID).get(logID));
}
finally {
@@ -233,8 +209,7 @@ public class LogStoreImpl implements Log
* @throws java.io.IOException
* in case of any error.
*/
- protected void put(String targetID, Long logID,
- List<Event> list) throws IOException {
+ protected void put(String targetID, Long logID, List<Event> list) throws IOException {
if ((list == null) || (list.size() == 0)) {
// nothing to add, so return
return;
@@ -243,8 +218,7 @@ public class LogStoreImpl implements Log
// 1. we can append events at the end of the existing file
// 2. we need to insert events in the existing file (meaning we have to
// rewrite basically the whole file)
- String file = new File(new File(m_dir, targetIDToFilename(targetID)),
- String.valueOf(logID)).getAbsolutePath();
+ String file = new File(new File(m_dir, targetIDToFilename(targetID)), String.valueOf(logID)).getAbsolutePath();
Long highest = m_fileToID.get(file);
boolean cached = false;
if (highest != null) {
@@ -254,8 +228,7 @@ public class LogStoreImpl implements Log
}
List<Event> events = null;
if (!cached) {
- events = getInternal(new Descriptor(targetID, logID,
- SortedRangeSet.FULL_SET));
+ events = getInternal(new Descriptor(targetID, logID, SortedRangeSet.FULL_SET));
// remove duplicates first
list.removeAll(events);
@@ -277,12 +250,9 @@ public class LogStoreImpl implements Log
if (!dir.isDirectory() && !dir.mkdirs()) {
throw new IOException("Unable to create backup store.");
}
- if (!removeEvents && (cached
- || ((events.size() == 0) || (events.get(events.size() - 1)
- .getID() < list.get(0).getID())))) {
+ if (!removeEvents && (cached || ((events.size() == 0) || (events.get(events.size() - 1).getID() < list.get(0).getID())))) {
// we can append to the existing file without need to remove records
- out = new PrintWriter(new FileWriter(new File(dir,
- logID.toString()), true));
+ out = new PrintWriter(new FileWriter(new File(dir, logID.toString()), true));
}
else {
// we have to merge the lists
@@ -295,12 +265,12 @@ public class LogStoreImpl implements Log
list.remove(0);
}
}
- out = new PrintWriter(new FileWriter(new File(dir,
- logID.toString())));
+ out = new PrintWriter(new FileWriter(new File(dir, logID.toString())));
}
long high = 0;
for (Event event : list) {
- out.println(event.toRepresentation());
+ String representation = event.toRepresentation();
+ out.println(representation);
if (high < event.getID()) {
high = event.getID();
}
@@ -341,8 +311,7 @@ public class LogStoreImpl implements Log
protected Map<String, Map<Long, List<Event>>> sort(List<Event> events) {
Map<String, Map<Long, List<Event>>> result = new HashMap<String, Map<Long, List<Event>>>();
for (Event event : events) {
- Map<Long, List<Event>> target = result
- .get(event.getTargetID());
+ Map<Long, List<Event>> target = result.get(event.getTargetID());
if (target == null) {
target = new HashMap<Long, List<Event>>();
@@ -365,8 +334,7 @@ public class LogStoreImpl implements Log
*/
private <T> T notNull(T target) throws IOException {
if (target == null) {
- throw new IOException(
- "Unknown IO error while trying to access the store.");
+ throw new IOException("Unknown IO error while trying to access the store.");
}
return target;
}
@@ -487,9 +455,9 @@ public class LogStoreImpl implements Log
// try to obtain the lock if we could not lock it on the first try
if (alreadyLocked) {
int nrOfTries = 1;
- while (alreadyLocked && nrOfTries < 10) {
+ while (alreadyLocked && nrOfTries < 10000) {
try {
- Thread.sleep(20);
+ Thread.sleep(1);
}
catch (InterruptedException e) {
break;
Modified: ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java?rev=1571642&r1=1571641&r2=1571642&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java (original)
+++ ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java Tue Feb 25 10:46:47 2014
@@ -30,6 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.ace.feedback.AuditEvent;
import org.apache.ace.feedback.Descriptor;
@@ -194,6 +197,60 @@ public class ServerLogStoreTester {
}
}
+
+
+
+ @SuppressWarnings("serial")
+ @Test(groups = { UNIT })
+ public void testConcurrentLog() throws IOException, InterruptedException {
+ ExecutorService es = Executors.newFixedThreadPool(8);
+ final Map<String, String> props = new HashMap<String, String>();
+ props.put("test", "bar");
+
+ List<Descriptor> ranges = m_logStore.getDescriptors();
+ assert ranges.isEmpty() : "New store should have no ranges.";
+ for (String target : new String[] { "g1", "g2", "g3" }) {
+ for (long log : new long[] { 1, 2, 3, 5 }) {
+ for (long id = 0; id < 500; id++) {
+ final String t = target;
+ final long l = log;
+ final long i = id;
+ es.execute(new Runnable() {
+ @Override
+ public void run() {
+ List<Event> list = new ArrayList<>();
+ list.add(new Event(t, l, i, System.currentTimeMillis(), AuditEvent.FRAMEWORK_STARTED, props));
+ try {
+ m_logStore.put(list);
+ }
+ catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ });
+ }
+ }
+ }
+ es.shutdown();
+ es.awaitTermination(60, TimeUnit.SECONDS);
+ int size = m_logStore.getDescriptors().size();
+ assert size == 3 * 4 : "Incorrect amount of ranges returned from store: " + size;
+ List<Event> stored = new ArrayList<Event>();
+ for (Descriptor range : m_logStore.getDescriptors()) {
+ for (Descriptor range2 : m_logStore.getDescriptors(range.getTargetID())) {
+ stored.addAll(m_logStore.get(m_logStore.getDescriptor(range2.getTargetID(), range2.getStoreID())));
+ }
+ }
+
+ Set<String> out = new HashSet<String>();
+ for (Event event : stored) {
+ out.add(event.toRepresentation());
+ }
+ }
+
+
private void delete(File root) {
if (root.isDirectory()) {
for (File child : root.listFiles()) {