You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/21 03:31:59 UTC
svn commit: r1187155 - in
/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc:
MockEventUtils.java TestJdbcChannelProvider.java
Author: arvind
Date: Fri Oct 21 01:31:59 2011
New Revision: 1187155
URL: http://svn.apache.org/viewvc?rev=1187155&view=rev
Log:
FLUME-728. Added a multi-threaded test to exercise JDBC channel.
Modified:
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java?rev=1187155&r1=1187154&r2=1187155&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java Fri Oct 21 01:31:59 2011
@@ -106,6 +106,10 @@ public final class MockEventUtils {
return new MockEvent(payload, headers, channel);
}
+ public static int generateSleepInterval(int upperBound) {
+ return Math.abs(RANDOM.nextInt(upperBound));
+ }
+
private MockEventUtils() {
// Disable explicit object creation
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1187155&r1=1187154&r2=1187155&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Fri Oct 21 01:31:59 2011
@@ -27,6 +27,12 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
@@ -105,7 +111,64 @@ public class TestJdbcChannelProvider {
}
/**
- * creaes 1000 events split over 5 channels, stores them
+ * Creates 1000 events split over 10 channels, stores them via multiple
+ * simulated sources and consumes them via multiple simulated channels.
+ */
+ @Test
+ public void testEventWithSimulatedSourceAndSinks() throws Exception {
+ provider = new JdbcChannelProviderImpl();
+ provider.initialize(derbyProps);
+
+ Map<String, List<MockEvent>> eventMap =
+ new HashMap<String, List<MockEvent>>();
+
+ for (int i = 1; i < 1001; i++) {
+ MockEvent me = MockEventUtils.generateMockEvent(i, i, i, 61%i, 10);
+ List<MockEvent> meList = eventMap.get(me.getChannel());
+ if (meList == null) {
+ meList = new ArrayList<MockEvent>();
+ eventMap.put(me.getChannel(), meList);
+ }
+ meList.add(me);
+ }
+
+ List<MockSource> sourceList = new ArrayList<MockSource>();
+ List<MockSink> sinkList = new ArrayList<MockSink>();
+
+ for (String channel : eventMap.keySet()) {
+ List<MockEvent> meList = eventMap.get(channel);
+ sourceList.add(new MockSource(channel, meList, provider));
+ sinkList.add(new MockSink(channel, meList, provider));
+ }
+
+ ExecutorService sourceExecutor = Executors.newFixedThreadPool(10);
+ ExecutorService sinkExecutor = Executors.newFixedThreadPool(10);
+
+ List<Future<Integer>> srcResults = sourceExecutor.invokeAll(sourceList,
+ 300, TimeUnit.SECONDS);
+ Thread.sleep(MockEventUtils.generateSleepInterval(3000));
+ List<Future<Integer>> sinkResults = sinkExecutor.invokeAll(sinkList,
+ 300, TimeUnit.SECONDS);
+
+ int srcCount = 0;
+ for (Future<Integer> srcOutput : srcResults) {
+ srcCount += srcOutput.get();
+ }
+
+ Assert.assertEquals(1000, srcCount);
+
+ int sinkCount = 0;
+ for (Future<Integer> sinkOutput : sinkResults) {
+ sinkCount += sinkOutput.get();
+ }
+
+ Assert.assertEquals(1000, sinkCount);
+
+ }
+
+
+ /**
+ * creates 1000 events split over 5 channels, stores them
*/
@Test
public void testPeristingEvents() {
@@ -153,7 +216,7 @@ public class TestJdbcChannelProvider {
provider = null;
}
- private void assertEquals(Event e1, Event e2) {
+ private static void assertEquals(Event e1, Event e2) {
byte[] pl1 = e1.getBody();
byte[] pl2 = e2.getBody();
@@ -185,4 +248,89 @@ public class TestJdbcChannelProvider {
}
provider = null;
}
+
+ private static class MockSink implements Callable<Integer> {
+
+ private final String channel;
+ private final List<MockEvent> events;
+ private final JdbcChannelProviderImpl provider;
+
+ private MockSink(String channel, List<MockEvent> events,
+ JdbcChannelProviderImpl provider) {
+ this.channel = channel;
+ this.events = events;
+ this.provider = provider;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ LOGGER.debug("Sink for channel[" + channel + "]: starting");
+ if (events == null) {
+ return 0;
+ }
+ Iterator<MockEvent> it = events.iterator();
+ while (it.hasNext()) {
+ MockEvent me = it.next();
+ Event event = null;
+ while (event == null) {
+ event = provider.removeEvent(channel);
+ if (event == null) {
+ LOGGER.debug("Sink for channel[" + channel + "]: empty queue");
+ try {
+ Thread.sleep(MockEventUtils.generateSleepInterval(1000));
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ LOGGER.debug("Sink for channel[" + channel + "]: removed event: "
+ + event);
+ }
+ }
+ TestJdbcChannelProvider.assertEquals(me, event);
+ }
+
+ LOGGER.debug("Sink for channel[" + channel + "]: retrieved all events");
+
+ return events.size();
+ }
+ }
+
+ private static class MockSource implements Callable<Integer> {
+
+ private final String channel;
+ private final List<MockEvent> events;
+ private final JdbcChannelProviderImpl provider;
+
+ private MockSource(String channel, List<MockEvent> events,
+ JdbcChannelProviderImpl provider) {
+ this.channel = channel;
+ this.events = events;
+ this.provider = provider;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ LOGGER.debug("Source for channel[" + channel + "]: starting");
+ if (events == null) {
+ return 0;
+ }
+ Iterator<MockEvent> it = events.iterator();
+ while (it.hasNext()) {
+ MockEvent me = it.next();
+ Assert.assertEquals(channel, me.getChannel());
+ provider.persistEvent(channel, me);
+ try {
+ Thread.sleep(MockEventUtils.generateSleepInterval(1000));
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ LOGGER.debug("Source for channel[" + channel + "]: submitted all events");
+
+ return events.size();
+ }
+
+ }
+
}