You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/20 03:29:28 UTC
svn commit: r1186596 - in
/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src:
main/java/org/apache/flume/channel/jdbc/impl/
test/java/org/apache/flume/channel/jdbc/
Author: arvind
Date: Thu Oct 20 01:29:27 2011
New Revision: 1186596
URL: http://svn.apache.org/viewvc?rev=1186596&view=rev
Log:
Updated tests for JDBC channel along with bug fixes.
Modified:
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java Thu Oct 20 01:29:27 2011
@@ -302,11 +302,11 @@ public class DerbySchemaHandler implemen
@Override
public boolean schemaExists() {
Connection connection = null;
- ResultSet rset = null;
+ Statement stmt = null;
try {
connection = dataSource.getConnection();
- Statement stmt = connection.createStatement();
- rset = stmt.executeQuery(QUREY_SYSCHEMA_FLUME);
+ stmt = connection.createStatement();
+ ResultSet rset = stmt.executeQuery(QUREY_SYSCHEMA_FLUME);
if (!rset.next()) {
LOGGER.warn("Schema for FLUME does not exist");
return false;
@@ -324,11 +324,11 @@ public class DerbySchemaHandler implemen
}
throw new JdbcChannelException("Unable to query schema", ex);
} finally {
- if (rset != null) {
+ if (stmt != null) {
try {
- rset.close();
+ stmt.close();
} catch (SQLException ex) {
- LOGGER.error("Unable to close result set", ex);
+ LOGGER.error("Unable to close schema lookup stmt", ex);
}
}
if (connection != null) {
@@ -547,7 +547,6 @@ public class DerbySchemaHandler implemen
List<HeaderEntry> headerWithNameSpill = new ArrayList<HeaderEntry>();
List<HeaderEntry> headerWithValueSpill = new ArrayList<HeaderEntry>();
-
baseHeaderStmt = connection.prepareStatement(STMT_INSERT_HEADER_BASE,
Statement.RETURN_GENERATED_KEYS);
Iterator<HeaderEntry> it = headers.iterator();
@@ -728,6 +727,10 @@ public class DerbySchemaHandler implemen
spillEventFetchStmt.setLong(1, eventId);
ResultSet rsetSpillEvent = spillEventFetchStmt.executeQuery();
+ if (!rsetSpillEvent.next()) {
+ throw new JdbcChannelException("Payload spill expected but not "
+ + "found for event: " + eventId);
+ }
Blob payloadSpillBlob = rsetSpillEvent.getBlob(1);
payloadInputStream = payloadSpillBlob.getBinaryStream();
ByteArrayOutputStream spillStream = new ByteArrayOutputStream();
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java Thu Oct 20 01:29:27 2011
@@ -300,9 +300,11 @@ public class PersistableEvent implements
public PersistableEvent build() {
List<HeaderEntry> bHeaders = new ArrayList<HeaderEntry>();
- for (long headerId : bHeaderParts.keySet()) {
- HeaderPart part = bHeaderParts.get(headerId);
- bHeaders.add(part.getEntry(headerId));
+ if (bHeaderParts != null) {
+ for (long headerId : bHeaderParts.keySet()) {
+ HeaderPart part = bHeaderParts.get(headerId);
+ bHeaders.add(part.getEntry(headerId));
+ }
}
PersistableEvent pe = new PersistableEvent(bEventId, bChannelName,
@@ -312,7 +314,7 @@ public class PersistableEvent implements
bChannelName = null;
bBasePayload = null;
bSpillPayload = null;
- bHeaderParts.clear();
+ bHeaderParts = null;
return pe;
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java Thu Oct 20 01:29:27 2011
@@ -25,10 +25,13 @@ public class MockEvent implements Event
private final byte[] payload;
private final Map<String, String> headers;
+ private final String channel;
- public MockEvent(byte[] payload, Map<String, String> headers) {
+ public MockEvent(byte[] payload, Map<String, String> headers, String channel)
+ {
this.payload = payload;
this.headers = headers;
+ this.channel = channel;
}
@Override
@@ -51,4 +54,8 @@ public class MockEvent implements Event
}
+ public String getChannel() {
+ return channel;
+ }
+
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java Thu Oct 20 01:29:27 2011
@@ -17,10 +17,18 @@
*/
package org.apache.flume.channel.jdbc;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public final class MockEventUtils {
+ public static final Logger LOGGER =
+ LoggerFactory.getLogger(MockEventUtils.class);
+
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] CHARS = new String[] {
@@ -49,9 +57,56 @@ public final class MockEventUtils {
return sb.toString();
}
+ /**
+ * Generates a mock event using the specified margins that are offset from
+ * the threshold values of the various sizes. Also the number of headers is
+ * specified along with number of channels. The last parameter - numChannels
+ * is used to calculate a channel name that will be used to tag the event
+ * with.
+ * @param payloadMargin
+ * @param headerNameMargin
+ * @param headerValueMargin
+ * @param numHeaders
+ * @param numChannels
+ * @return
+ */
+ public static MockEvent generateMockEvent(int payloadMargin,
+ int headerNameMargin, int headerValueMargin, int numHeaders,
+ int numChannels) {
+
+ int chIndex = Math.abs(RANDOM.nextInt())%numChannels;
+ String channel = "test-"+chIndex;
+
+ StringBuilder sb = new StringBuilder("New Event[payload size:");
+
+ int plTh = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
+ int plSize = Math.abs(RANDOM.nextInt())%plTh + payloadMargin;
+ sb.append(plSize).append(", numHeaders:").append(numHeaders);
+ sb.append(", channel:").append(channel);
+
+ byte[] payload = generatePayload(plSize);
+ int nmTh = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
+ int vlTh = ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD;
+
+ Map<String, String> headers = new HashMap<String, String>();
+ for (int i = 0; i < numHeaders; i++) {
+ int nmSize = Math.abs(RANDOM.nextInt())%nmTh + headerNameMargin;
+ int vlSize = Math.abs(RANDOM.nextInt())%vlTh + headerValueMargin;
+
+ String name = generateHeaderString(nmSize);
+ String value = generateHeaderString(vlSize);
+
+ headers.put(name, value);
+ sb.append("{nm:").append(nmSize).append(",vl:").append(vlSize);
+ sb.append("} ");
+ }
+
+ LOGGER.debug(sb.toString());
+
+ return new MockEvent(payload, headers, channel);
+ }
+
private MockEventUtils() {
// Disable explicit object creation
}
-
-
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java Thu Oct 20 01:29:27 2011
@@ -54,6 +54,65 @@ public class TestDerbySchemaHandlerQueri
+ "VARCHAR(32517), FOREIGN KEY (FLV_HEADER) REFERENCES "
+ "FLUME.FL_HEADER (FLH_ID))";
+ public static final String EXPECTED_COLUMN_LOOKUP_QUERY
+ = "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
+ + "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
+ + "SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE "
+ + "SCHEMANAME = ? ))";
+
+ public static final String EXPECTED_STMT_INSERT_EVENT_BASE
+ = "INSERT INTO FLUME.FL_EVENT (FLE_PAYLOAD, FLE_CHANNEL, FLE_SPILL) "
+ + "VALUES ( ?, ?, ?)";
+
+ public static final String EXPECTED_STMT_INSERT_EVENT_SPILL
+ = "INSERT INTO FLUME.FL_PLSPILL (FLP_EVENT, FLP_SPILL) VALUES ( ?, ?)";
+
+ public static final String EXPECTED_STMT_INSERT_HEADER_BASE
+ = "INSERT INTO FLUME.FL_HEADER (FLH_EVENT, FLH_NAME, FLH_VALUE, "
+ + "FLH_NMSPILL, FLH_VLSPILL) VALUES ( ?, ?, ?, ?, ?)";
+
+
+ public static final String EXPECTED_STMT_INSERT_HEADER_NAME_SPILL
+ = "INSERT INTO FLUME.FL_NMSPILL (FLN_HEADER, FLN_SPILL) VALUES ( ?, ?)";
+
+ public static final String EXPECTED_STMT_INSERT_HEADER_VALUE_SPILL
+ = "INSERT INTO FLUME.FL_VLSPILL (FLV_HEADER, FLV_SPILL) VALUES ( ?, ?)";
+
+ public static final String EXPECTED_STMT_FETCH_PAYLOAD_BASE
+ = "SELECT FLE_ID, FLE_PAYLOAD, FLE_SPILL FROM FLUME.FL_EVENT WHERE "
+ + "FLE_ID = (SELECT MIN(FLE_ID) FROM FLUME.FL_EVENT WHERE "
+ + "FLE_CHANNEL = ?)";
+
+
+ public static final String EXPECTED_STMT_FETCH_PAYLOAD_SPILL
+ = "SELECT FLP_SPILL FROM FLUME.FL_PLSPILL WHERE FLP_EVENT = ?";
+
+ public static final String EXPECTED_STMT_FETCH_HEADER_BASE
+ = "SELECT FLH_ID, FLH_NAME, FLH_VALUE, FLH_NMSPILL, FLH_VLSPILL FROM "
+ + "FLUME.FL_HEADER WHERE FLH_EVENT = ?";
+
+ public static final String EXPECTED_STMT_FETCH_HEADER_NAME_SPILL
+ = "SELECT FLN_SPILL FROM FLUME.FL_NMSPILL WHERE FLN_HEADER = ?";
+
+ public static final String EXPECTED_STMT_FETCH_HEADER_VALUE_SPILL
+ = "SELECT FLV_SPILL FROM FLUME.FL_VLSPILL WHERE FLV_HEADER = ?";
+
+ public static final String EXPECTED_STMT_DELETE_HEADER_VALUE_SPILL
+ = "DELETE FROM FLUME.FL_VLSPILL WHERE FLV_HEADER = ?";
+
+ public static final String EXPECTED_STMT_DELETE_HEADER_NAME_SPILL
+ = "DELETE FROM FLUME.FL_NMSPILL WHERE FLN_HEADER = ?";
+
+ public static final String EXPECTED_STMT_DELETE_EVENT_SPILL
+ = "DELETE FROM FLUME.FL_PLSPILL WHERE FLP_EVENT = ?";
+
+ public static final String EXPECTED_STMT_DELETE_HEADER_BASE
+ = "DELETE FROM FLUME.FL_HEADER WHERE FLH_EVENT = ?";
+
+ public static final String EXPECTED_STMT_DELETE_EVENT_BASE
+ = "DELETE FROM FLUME.FL_EVENT WHERE FLE_ID = ?";
+
+
@Test
public void testCreateQueries() {
@@ -75,6 +134,55 @@ public class TestDerbySchemaHandlerQueri
Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_VLSPILL,
EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL);
+
+ Assert.assertEquals(DerbySchemaHandler.COLUMN_LOOKUP_QUERY,
+ EXPECTED_COLUMN_LOOKUP_QUERY);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_EVENT_BASE,
+ EXPECTED_STMT_INSERT_EVENT_BASE);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_EVENT_SPILL,
+ EXPECTED_STMT_INSERT_EVENT_SPILL);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_HEADER_BASE,
+ EXPECTED_STMT_INSERT_HEADER_BASE);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_HEADER_NAME_SPILL,
+ EXPECTED_STMT_INSERT_HEADER_NAME_SPILL);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_HEADER_VALUE_SPILL,
+ EXPECTED_STMT_INSERT_HEADER_VALUE_SPILL);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_PAYLOAD_BASE,
+ EXPECTED_STMT_FETCH_PAYLOAD_BASE);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_PAYLOAD_SPILL,
+ EXPECTED_STMT_FETCH_PAYLOAD_SPILL);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_HEADER_BASE,
+ EXPECTED_STMT_FETCH_HEADER_BASE);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_HEADER_NAME_SPILL,
+ EXPECTED_STMT_FETCH_HEADER_NAME_SPILL);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_HEADER_VALUE_SPILL,
+ EXPECTED_STMT_FETCH_HEADER_VALUE_SPILL);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_HEADER_VALUE_SPILL,
+ EXPECTED_STMT_DELETE_HEADER_VALUE_SPILL);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_HEADER_NAME_SPILL,
+ EXPECTED_STMT_DELETE_HEADER_NAME_SPILL);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_EVENT_SPILL,
+ EXPECTED_STMT_DELETE_EVENT_SPILL);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_HEADER_BASE,
+ EXPECTED_STMT_DELETE_HEADER_BASE);
+
+ Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_EVENT_BASE,
+ EXPECTED_STMT_DELETE_EVENT_BASE);
+
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Thu Oct 20 01:29:27 2011
@@ -19,17 +19,20 @@ package org.apache.flume.channel.jdbc;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
-
-import org.junit.Assert;
+import java.util.Set;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
-import org.apache.flume.channel.jdbc.impl.PersistableEvent;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -101,35 +104,50 @@ public class TestJdbcChannelProvider {
provider = null;
}
+ /**
+ * creaes 1000 events split over 5 channels, stores them
+ */
@Test
public void testPeristingEvents() {
provider = new JdbcChannelProviderImpl();
provider.initialize(derbyProps);
- int nameLimit = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
- int th = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
+ Map<String, List<MockEvent>> eventMap =
+ new HashMap<String, List<MockEvent>>();
- byte[] s1 = MockEventUtils.generatePayload(th - 1);
- Map<String, String> m1 = new HashMap<String, String>();
- m1.put(MockEventUtils.generateHeaderString(1), "one");
- m1.put(MockEventUtils.generateHeaderString(2), "twotwo");
- m1.put(MockEventUtils.generateHeaderString(3), "three");
- m1.put(MockEventUtils.generateHeaderString(100), "ahundred");
- m1.put(MockEventUtils.generateHeaderString(nameLimit - 21), "w");
- m1.put(MockEventUtils.generateHeaderString(nameLimit - 2), "x");
- m1.put(MockEventUtils.generateHeaderString(nameLimit - 1), "y");
- m1.put(MockEventUtils.generateHeaderString(nameLimit), "z");
- m1.put(MockEventUtils.generateHeaderString(nameLimit + 1), "a");
- m1.put(MockEventUtils.generateHeaderString(nameLimit + 2), "b");
- m1.put(MockEventUtils.generateHeaderString(nameLimit + 21), "c");
+ Set<MockEvent> events = new HashSet<MockEvent>();
+ for (int i = 1; i < 1001; i++) {
+ events.add(MockEventUtils.generateMockEvent(i, i, i, 61%i, 5));
+ }
- Event event1 = new MockEvent(s1, m1);
+ Iterator<MockEvent> meIt = events.iterator();
+ while (meIt.hasNext()) {
+ MockEvent me = meIt.next();
+ String chName = me.getChannel();
+ List<MockEvent> eventList = eventMap.get(chName);
+ if (eventList == null) {
+ eventList = new ArrayList<MockEvent>();
+ eventMap.put(chName, eventList);
+ }
+ eventList.add(me);
+ provider.persistEvent(me.getChannel(), me);
+ }
- provider.persistEvent("test", event1);
+ // Now retrieve the events and they should be in the persistence order
- Event event2 = provider.removeEvent("test");
+ for (String chName : eventMap.keySet()) {
+ List<MockEvent> meList = eventMap.get(chName);
+ Iterator<MockEvent> it = meList.iterator();
+ while (it.hasNext()) {
+ MockEvent me = it.next();
+ Event event = provider.removeEvent(chName);
+ assertEquals(me, event);
+ }
- assertEquals(event1, event2);
+ // Now the there should be no more events for this channel
+ Event nullEvent = provider.removeEvent(chName);
+ Assert.assertNull(nullEvent);
+ }
provider.close();
provider = null;
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java?rev=1186596&r1=1186595&r2=1186596&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java Thu Oct 20 01:29:27 2011
@@ -97,7 +97,7 @@ public class TestPersistentEvent {
private void runTest(byte[] payload, Map<String, String> headers) {
PersistableEvent pe = new PersistableEvent("test",
- new MockEvent(payload, headers));
+ new MockEvent(payload, headers, null));
Assert.assertArrayEquals(payload, pe.getBody());
Map<String, String> h = pe.getHeaders();
if (h == null) {