You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/21 03:31:59 UTC

svn commit: r1187155 - in /incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc: MockEventUtils.java TestJdbcChannelProvider.java

Author: arvind
Date: Fri Oct 21 01:31:59 2011
New Revision: 1187155

URL: http://svn.apache.org/viewvc?rev=1187155&view=rev
Log:
FLUME-728. Added a multi-threaded test to exercise JDBC channel.

Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java?rev=1187155&r1=1187154&r2=1187155&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java Fri Oct 21 01:31:59 2011
@@ -106,6 +106,10 @@ public final class MockEventUtils {
     return new MockEvent(payload, headers, channel);
   }
 
+  public static int generateSleepInterval(int upperBound) {
+    return Math.abs(RANDOM.nextInt(upperBound));
+  }
+
   private MockEventUtils() {
     // Disable explicit object creation
   }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1187155&r1=1187154&r2=1187155&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Fri Oct 21 01:31:59 2011
@@ -27,6 +27,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
@@ -105,7 +111,64 @@ public class TestJdbcChannelProvider {
   }
 
   /**
-   * creaes 1000 events split over 5 channels, stores them
+   * Creates 1000 events split over 10 channels, stores them via multiple
+   * simulated sources and consumes them via multiple simulated channels.
+   */
+  @Test
+  public void testEventWithSimulatedSourceAndSinks() throws Exception {
+    provider = new JdbcChannelProviderImpl();
+    provider.initialize(derbyProps);
+
+    Map<String, List<MockEvent>> eventMap =
+        new HashMap<String, List<MockEvent>>();
+
+    for (int i = 1; i < 1001; i++) {
+      MockEvent me = MockEventUtils.generateMockEvent(i, i, i, 61%i, 10);
+      List<MockEvent> meList = eventMap.get(me.getChannel());
+      if (meList == null) {
+        meList = new ArrayList<MockEvent>();
+        eventMap.put(me.getChannel(), meList);
+      }
+      meList.add(me);
+    }
+
+    List<MockSource> sourceList = new ArrayList<MockSource>();
+    List<MockSink> sinkList = new ArrayList<MockSink>();
+
+    for (String channel : eventMap.keySet()) {
+      List<MockEvent> meList = eventMap.get(channel);
+      sourceList.add(new MockSource(channel, meList, provider));
+      sinkList.add(new MockSink(channel, meList, provider));
+    }
+
+    ExecutorService sourceExecutor = Executors.newFixedThreadPool(10);
+    ExecutorService sinkExecutor = Executors.newFixedThreadPool(10);
+
+    List<Future<Integer>> srcResults = sourceExecutor.invokeAll(sourceList,
+        300, TimeUnit.SECONDS);
+    Thread.sleep(MockEventUtils.generateSleepInterval(3000));
+    List<Future<Integer>> sinkResults = sinkExecutor.invokeAll(sinkList,
+        300, TimeUnit.SECONDS);
+
+    int srcCount = 0;
+    for (Future<Integer> srcOutput : srcResults) {
+      srcCount += srcOutput.get();
+    }
+
+    Assert.assertEquals(1000, srcCount);
+
+    int sinkCount = 0;
+    for (Future<Integer> sinkOutput : sinkResults) {
+      sinkCount += sinkOutput.get();
+    }
+
+    Assert.assertEquals(1000, sinkCount);
+
+  }
+
+
+  /**
+   * creates 1000 events split over 5 channels, stores them
    */
   @Test
   public void testPeristingEvents() {
@@ -153,7 +216,7 @@ public class TestJdbcChannelProvider {
     provider = null;
   }
 
-  private void assertEquals(Event e1, Event e2) {
+  private static void assertEquals(Event e1, Event e2) {
     byte[] pl1 = e1.getBody();
     byte[] pl2 = e2.getBody();
 
@@ -185,4 +248,89 @@ public class TestJdbcChannelProvider {
     }
     provider = null;
   }
+
+  private static class MockSink implements Callable<Integer> {
+
+    private final String channel;
+    private final List<MockEvent> events;
+    private final JdbcChannelProviderImpl provider;
+
+    private MockSink(String channel, List<MockEvent> events,
+        JdbcChannelProviderImpl provider) {
+      this.channel = channel;
+      this.events = events;
+      this.provider = provider;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+      LOGGER.debug("Sink for channel[" + channel + "]: starting");
+      if (events == null) {
+        return 0;
+      }
+      Iterator<MockEvent> it = events.iterator();
+      while (it.hasNext()) {
+        MockEvent me = it.next();
+        Event event = null;
+        while (event == null) {
+          event = provider.removeEvent(channel);
+          if (event == null) {
+            LOGGER.debug("Sink for channel[" + channel + "]: empty queue");
+            try {
+              Thread.sleep(MockEventUtils.generateSleepInterval(1000));
+            } catch (InterruptedException ex) {
+              Thread.currentThread().interrupt();
+            }
+          } else {
+            LOGGER.debug("Sink for channel[" + channel + "]: removed event: "
+                    + event);
+          }
+        }
+        TestJdbcChannelProvider.assertEquals(me, event);
+      }
+
+      LOGGER.debug("Sink for channel[" + channel + "]: retrieved all events");
+
+      return events.size();
+    }
+  }
+
+  private static class MockSource implements Callable<Integer> {
+
+    private final String channel;
+    private final List<MockEvent> events;
+    private final JdbcChannelProviderImpl provider;
+
+    private MockSource(String channel, List<MockEvent> events,
+        JdbcChannelProviderImpl provider) {
+      this.channel = channel;
+      this.events = events;
+      this.provider = provider;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+      LOGGER.debug("Source for channel[" + channel + "]: starting");
+      if (events == null) {
+        return 0;
+      }
+      Iterator<MockEvent> it = events.iterator();
+      while (it.hasNext()) {
+        MockEvent me = it.next();
+        Assert.assertEquals(channel, me.getChannel());
+        provider.persistEvent(channel, me);
+        try {
+          Thread.sleep(MockEventUtils.generateSleepInterval(1000));
+        } catch (InterruptedException ex) {
+          Thread.currentThread().interrupt();
+        }
+      }
+
+      LOGGER.debug("Source for channel[" + channel + "]: submitted all events");
+
+      return events.size();
+    }
+
+  }
+
 }