You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/20 03:29:28 UTC

svn commit: r1186596 - in /incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src: main/java/org/apache/flume/channel/jdbc/impl/ test/java/org/apache/flume/channel/jdbc/

Author: arvind
Date: Thu Oct 20 01:29:27 2011
New Revision: 1186596

URL: http://svn.apache.org/viewvc?rev=1186596&view=rev
Log:
Updated tests for JDBC channel along with bug fixes.

Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java Thu Oct 20 01:29:27 2011
@@ -302,11 +302,11 @@ public class DerbySchemaHandler implemen
   @Override
   public boolean schemaExists() {
     Connection connection = null;
-    ResultSet rset = null;
+    Statement stmt = null;
     try {
       connection = dataSource.getConnection();
-      Statement stmt = connection.createStatement();
-      rset = stmt.executeQuery(QUREY_SYSCHEMA_FLUME);
+      stmt = connection.createStatement();
+      ResultSet  rset = stmt.executeQuery(QUREY_SYSCHEMA_FLUME);
       if (!rset.next()) {
         LOGGER.warn("Schema for FLUME does not exist");
         return false;
@@ -324,11 +324,11 @@ public class DerbySchemaHandler implemen
       }
       throw new JdbcChannelException("Unable to query schema", ex);
     } finally {
-      if (rset != null) {
+      if (stmt != null) {
         try {
-          rset.close();
+          stmt.close();
         } catch (SQLException ex) {
-          LOGGER.error("Unable to close result set", ex);
+          LOGGER.error("Unable to close schema lookup stmt", ex);
         }
       }
       if (connection != null) {
@@ -547,7 +547,6 @@ public class DerbySchemaHandler implemen
         List<HeaderEntry> headerWithNameSpill = new ArrayList<HeaderEntry>();
         List<HeaderEntry> headerWithValueSpill = new ArrayList<HeaderEntry>();
 
-
         baseHeaderStmt = connection.prepareStatement(STMT_INSERT_HEADER_BASE,
                                 Statement.RETURN_GENERATED_KEYS);
         Iterator<HeaderEntry> it = headers.iterator();
@@ -728,6 +727,10 @@ public class DerbySchemaHandler implemen
 
         spillEventFetchStmt.setLong(1, eventId);
         ResultSet rsetSpillEvent = spillEventFetchStmt.executeQuery();
+        if (!rsetSpillEvent.next()) {
+          throw new JdbcChannelException("Payload spill expected but not "
+              + "found for event: " + eventId);
+        }
         Blob payloadSpillBlob = rsetSpillEvent.getBlob(1);
         payloadInputStream = payloadSpillBlob.getBinaryStream();
         ByteArrayOutputStream spillStream = new ByteArrayOutputStream();

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java Thu Oct 20 01:29:27 2011
@@ -300,9 +300,11 @@ public class PersistableEvent implements
 
     public PersistableEvent build() {
       List<HeaderEntry> bHeaders = new ArrayList<HeaderEntry>();
-      for (long headerId : bHeaderParts.keySet()) {
-        HeaderPart part = bHeaderParts.get(headerId);
-        bHeaders.add(part.getEntry(headerId));
+      if (bHeaderParts != null) {
+        for (long headerId : bHeaderParts.keySet()) {
+          HeaderPart part = bHeaderParts.get(headerId);
+          bHeaders.add(part.getEntry(headerId));
+        }
       }
 
       PersistableEvent pe = new PersistableEvent(bEventId, bChannelName,
@@ -312,7 +314,7 @@ public class PersistableEvent implements
       bChannelName = null;
       bBasePayload = null;
       bSpillPayload = null;
-      bHeaderParts.clear();
+      bHeaderParts = null;
 
       return pe;
     }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java Thu Oct 20 01:29:27 2011
@@ -25,10 +25,13 @@ public class MockEvent implements Event 
 
   private final byte[] payload;
   private final Map<String, String> headers;
+  private final String channel;
 
-  public MockEvent(byte[] payload, Map<String, String> headers) {
+  public MockEvent(byte[] payload, Map<String, String> headers, String channel)
+  {
     this.payload = payload;
     this.headers = headers;
+    this.channel = channel;
   }
 
   @Override
@@ -51,4 +54,8 @@ public class MockEvent implements Event 
 
   }
 
+  public String getChannel() {
+    return channel;
+  }
+
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java Thu Oct 20 01:29:27 2011
@@ -17,10 +17,18 @@
  */
 package org.apache.flume.channel.jdbc;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public final class MockEventUtils {
 
+  public static final Logger LOGGER =
+      LoggerFactory.getLogger(MockEventUtils.class);
+
   private static final Random RANDOM = new Random(System.currentTimeMillis());
 
   private static final String[] CHARS = new String[] {
@@ -49,9 +57,56 @@ public final class MockEventUtils {
     return sb.toString();
   }
 
+  /**
+   * Generates a mock event using the specified margins that are offset from
+   * the threshold values of the various sizes. Also the number of headers is
+   * specified along with number of channels. The last parameter - numChannels
+   * is used to calculate a channel name that will be used to tag the event
+   * with.
+   * @param payloadMargin
+   * @param headerNameMargin
+   * @param headerValueMargin
+   * @param numHeaders
+   * @param numChannels
+   * @return
+   */
+  public static MockEvent generateMockEvent(int payloadMargin,
+      int headerNameMargin,	int headerValueMargin, int numHeaders,
+      int numChannels) {
+
+    int chIndex = Math.abs(RANDOM.nextInt())%numChannels;
+    String channel = "test-"+chIndex;
+
+    StringBuilder sb = new StringBuilder("New Event[payload size:");
+
+    int plTh = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
+    int plSize = Math.abs(RANDOM.nextInt())%plTh + payloadMargin;
+    sb.append(plSize).append(", numHeaders:").append(numHeaders);
+    sb.append(", channel:").append(channel);
+
+    byte[] payload = generatePayload(plSize);
+    int nmTh = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
+    int vlTh = ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD;
+
+    Map<String, String> headers = new HashMap<String, String>();
+    for (int i = 0; i < numHeaders; i++) {
+      int nmSize = Math.abs(RANDOM.nextInt())%nmTh + headerNameMargin;
+      int vlSize = Math.abs(RANDOM.nextInt())%vlTh + headerValueMargin;
+
+      String name = generateHeaderString(nmSize);
+      String value = generateHeaderString(vlSize);
+
+      headers.put(name, value);
+      sb.append("{nm:").append(nmSize).append(",vl:").append(vlSize);
+      sb.append("} ");
+    }
+
+    LOGGER.debug(sb.toString());
+
+    return new MockEvent(payload, headers, channel);
+  }
+
   private MockEventUtils() {
     // Disable explicit object creation
   }
-
-
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java Thu Oct 20 01:29:27 2011
@@ -54,6 +54,65 @@ public class TestDerbySchemaHandlerQueri
             + "VARCHAR(32517), FOREIGN KEY (FLV_HEADER) REFERENCES "
             + "FLUME.FL_HEADER (FLH_ID))";
 
+  public static final String EXPECTED_COLUMN_LOOKUP_QUERY
+      = "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
+          + "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
+          + "SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE "
+          + "SCHEMANAME = ? ))";
+
+  public static final String EXPECTED_STMT_INSERT_EVENT_BASE
+      = "INSERT INTO FLUME.FL_EVENT (FLE_PAYLOAD, FLE_CHANNEL, FLE_SPILL) "
+          + "VALUES ( ?, ?, ?)";
+
+  public static final String EXPECTED_STMT_INSERT_EVENT_SPILL
+      = "INSERT INTO FLUME.FL_PLSPILL (FLP_EVENT, FLP_SPILL) VALUES ( ?, ?)";
+
+  public static final String EXPECTED_STMT_INSERT_HEADER_BASE
+      = "INSERT INTO FLUME.FL_HEADER (FLH_EVENT, FLH_NAME, FLH_VALUE, "
+          + "FLH_NMSPILL, FLH_VLSPILL) VALUES ( ?, ?, ?, ?, ?)";
+
+
+  public static final String EXPECTED_STMT_INSERT_HEADER_NAME_SPILL
+      = "INSERT INTO FLUME.FL_NMSPILL (FLN_HEADER, FLN_SPILL) VALUES ( ?, ?)";
+
+  public static final String EXPECTED_STMT_INSERT_HEADER_VALUE_SPILL
+      = "INSERT INTO FLUME.FL_VLSPILL (FLV_HEADER, FLV_SPILL) VALUES ( ?, ?)";
+
+  public static final String EXPECTED_STMT_FETCH_PAYLOAD_BASE
+      = "SELECT FLE_ID, FLE_PAYLOAD, FLE_SPILL FROM FLUME.FL_EVENT WHERE "
+          + "FLE_ID = (SELECT MIN(FLE_ID) FROM FLUME.FL_EVENT WHERE "
+          + "FLE_CHANNEL = ?)";
+
+
+  public static final String EXPECTED_STMT_FETCH_PAYLOAD_SPILL
+      = "SELECT FLP_SPILL FROM FLUME.FL_PLSPILL WHERE FLP_EVENT = ?";
+
+  public static final String EXPECTED_STMT_FETCH_HEADER_BASE
+      = "SELECT FLH_ID, FLH_NAME, FLH_VALUE, FLH_NMSPILL, FLH_VLSPILL FROM "
+          + "FLUME.FL_HEADER WHERE FLH_EVENT = ?";
+
+  public static final String EXPECTED_STMT_FETCH_HEADER_NAME_SPILL
+      = "SELECT FLN_SPILL FROM FLUME.FL_NMSPILL WHERE FLN_HEADER = ?";
+
+  public static final String EXPECTED_STMT_FETCH_HEADER_VALUE_SPILL
+      = "SELECT FLV_SPILL FROM FLUME.FL_VLSPILL WHERE FLV_HEADER = ?";
+
+  public static final String EXPECTED_STMT_DELETE_HEADER_VALUE_SPILL
+      = "DELETE FROM FLUME.FL_VLSPILL WHERE FLV_HEADER = ?";
+
+  public static final String EXPECTED_STMT_DELETE_HEADER_NAME_SPILL
+      = "DELETE FROM FLUME.FL_NMSPILL WHERE FLN_HEADER = ?";
+
+  public static final String EXPECTED_STMT_DELETE_EVENT_SPILL
+      = "DELETE FROM FLUME.FL_PLSPILL WHERE FLP_EVENT = ?";
+
+  public static final String EXPECTED_STMT_DELETE_HEADER_BASE
+      = "DELETE FROM FLUME.FL_HEADER WHERE FLH_EVENT = ?";
+
+  public static final String EXPECTED_STMT_DELETE_EVENT_BASE
+      = "DELETE FROM FLUME.FL_EVENT WHERE FLE_ID = ?";
+
+
   @Test
   public void testCreateQueries() {
 
@@ -75,6 +134,55 @@ public class TestDerbySchemaHandlerQueri
     Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_VLSPILL,
         EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL);
 
+
+    Assert.assertEquals(DerbySchemaHandler.COLUMN_LOOKUP_QUERY,
+        EXPECTED_COLUMN_LOOKUP_QUERY);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_EVENT_BASE,
+        EXPECTED_STMT_INSERT_EVENT_BASE);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_EVENT_SPILL,
+        EXPECTED_STMT_INSERT_EVENT_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_HEADER_BASE,
+        EXPECTED_STMT_INSERT_HEADER_BASE);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_HEADER_NAME_SPILL,
+        EXPECTED_STMT_INSERT_HEADER_NAME_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_HEADER_VALUE_SPILL,
+        EXPECTED_STMT_INSERT_HEADER_VALUE_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_PAYLOAD_BASE,
+        EXPECTED_STMT_FETCH_PAYLOAD_BASE);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_PAYLOAD_SPILL,
+        EXPECTED_STMT_FETCH_PAYLOAD_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_HEADER_BASE,
+        EXPECTED_STMT_FETCH_HEADER_BASE);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_HEADER_NAME_SPILL,
+        EXPECTED_STMT_FETCH_HEADER_NAME_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_HEADER_VALUE_SPILL,
+        EXPECTED_STMT_FETCH_HEADER_VALUE_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_HEADER_VALUE_SPILL,
+        EXPECTED_STMT_DELETE_HEADER_VALUE_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_HEADER_NAME_SPILL,
+        EXPECTED_STMT_DELETE_HEADER_NAME_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_EVENT_SPILL,
+        EXPECTED_STMT_DELETE_EVENT_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_HEADER_BASE,
+        EXPECTED_STMT_DELETE_HEADER_BASE);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_EVENT_BASE,
+        EXPECTED_STMT_DELETE_EVENT_BASE);
+
   }
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Thu Oct 20 01:29:27 2011
@@ -19,17 +19,20 @@ package org.apache.flume.channel.jdbc;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-
-import org.junit.Assert;
+import java.util.Set;
 
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
-import org.apache.flume.channel.jdbc.impl.PersistableEvent;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -101,35 +104,50 @@ public class TestJdbcChannelProvider {
     provider = null;
   }
 
+  /**
+   * creaes 1000 events split over 5 channels, stores them
+   */
   @Test
   public void testPeristingEvents() {
     provider = new JdbcChannelProviderImpl();
     provider.initialize(derbyProps);
 
-    int nameLimit = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
-    int th = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
+    Map<String, List<MockEvent>> eventMap =
+        new HashMap<String, List<MockEvent>>();
 
-    byte[] s1 = MockEventUtils.generatePayload(th - 1);
-    Map<String, String> m1 = new HashMap<String, String>();
-    m1.put(MockEventUtils.generateHeaderString(1), "one");
-    m1.put(MockEventUtils.generateHeaderString(2), "twotwo");
-    m1.put(MockEventUtils.generateHeaderString(3), "three");
-    m1.put(MockEventUtils.generateHeaderString(100), "ahundred");
-    m1.put(MockEventUtils.generateHeaderString(nameLimit - 21), "w");
-    m1.put(MockEventUtils.generateHeaderString(nameLimit - 2), "x");
-    m1.put(MockEventUtils.generateHeaderString(nameLimit - 1), "y");
-    m1.put(MockEventUtils.generateHeaderString(nameLimit), "z");
-    m1.put(MockEventUtils.generateHeaderString(nameLimit + 1), "a");
-    m1.put(MockEventUtils.generateHeaderString(nameLimit + 2), "b");
-    m1.put(MockEventUtils.generateHeaderString(nameLimit + 21), "c");
+    Set<MockEvent> events = new HashSet<MockEvent>();
+    for (int i = 1; i < 1001; i++) {
+      events.add(MockEventUtils.generateMockEvent(i, i, i, 61%i, 5));
+    }
 
-    Event event1 = new MockEvent(s1, m1);
+    Iterator<MockEvent> meIt = events.iterator();
+    while (meIt.hasNext()) {
+      MockEvent me = meIt.next();
+      String chName = me.getChannel();
+      List<MockEvent> eventList = eventMap.get(chName);
+      if (eventList == null) {
+        eventList = new ArrayList<MockEvent>();
+        eventMap.put(chName, eventList);
+      }
+      eventList.add(me);
+      provider.persistEvent(me.getChannel(), me);
+    }
 
-    provider.persistEvent("test", event1);
+    // Now retrieve the events and they should be in the persistence order
 
-    Event event2 = provider.removeEvent("test");
+    for (String chName : eventMap.keySet()) {
+      List<MockEvent> meList = eventMap.get(chName);
+      Iterator<MockEvent> it = meList.iterator();
+      while (it.hasNext()) {
+        MockEvent me = it.next();
+        Event event = provider.removeEvent(chName);
+        assertEquals(me, event);
+      }
 
-    assertEquals(event1, event2);
+      // Now the there should be no more events for this channel
+      Event nullEvent = provider.removeEvent(chName);
+      Assert.assertNull(nullEvent);
+    }
 
     provider.close();
     provider = null;

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java Thu Oct 20 01:29:27 2011
@@ -97,7 +97,7 @@ public class TestPersistentEvent {
 
   private void runTest(byte[] payload, Map<String, String> headers) {
     PersistableEvent pe = new PersistableEvent("test",
-        new MockEvent(payload, headers));
+        new MockEvent(payload, headers, null));
     Assert.assertArrayEquals(payload, pe.getBody());
     Map<String, String> h = pe.getHeaders();
     if (h == null) {