You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/14 11:13:39 UTC

svn commit: r1183252 - in /incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel: ./ src/main/java/org/apache/flume/channel/jdbc/impl/ src/test/java/org/apache/flume/channel/jdbc/

Author: arvind
Date: Fri Oct 14 09:13:39 2011
New Revision: 1183252

URL: http://svn.apache.org/viewvc?rev=1183252&view=rev
Log:
Implemented persistence of events for derby.

Added:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/   (props changed)
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java

Propchange: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct 14 09:13:39 2011
@@ -2,3 +2,4 @@
 .project
 .settings
 target
+derby.log

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java Fri Oct 14 09:13:39 2011
@@ -17,18 +17,24 @@
  */
 package org.apache.flume.channel.jdbc.impl;
 
+import java.io.ByteArrayInputStream;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import javax.sql.DataSource;
 
 import org.apache.flume.channel.jdbc.ConfigurationConstants;
 import org.apache.flume.channel.jdbc.JdbcChannelException;
+import org.apache.flume.channel.jdbc.impl.PersistableEvent.HeaderEntry;
+import org.apache.flume.channel.jdbc.impl.PersistableEvent.SpillableString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -209,12 +215,35 @@ public class DerbySchemaHandler implemen
         + "FOREIGN KEY (" + COLUMN_FLV_HEADER + ") REFERENCES "
         + TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + "))";
 
-  public static final String COLUMN_LOOKUP_QUERY =
-      "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
+  public static final String COLUMN_LOOKUP_QUERY
+      = "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
          + "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
          + "SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE "
          + "SCHEMANAME = ? ))";
 
+  public static final String STMT_INSERT_EVENT_BASE
+      = "INSERT INTO " + TABLE_FL_EVENT + " ("
+          + COLUMN_FLE_PAYLOAD + ", " + COLUMN_FLE_CHANNEL + ", "
+          + COLUMN_FLE_SPILL + ") VALUES ( ?, ?, ?)";
+
+  public static final String STMT_INSERT_EVENT_SPILL
+      = "INSERT INTO " + TABLE_FL_PLSPILL + " ("
+          + COLUMN_FLP_EVENT + ", " + COLUMN_FLP_SPILL + ") VALUES ( ?, ?)";
+
+  public static final String STMT_INSERT_HEADER_BASE
+      = "INSERT INTO " + TABLE_FL_HEADER + " ("
+          + COLUMN_FLH_EVENT + ", " + COLUMN_FLH_NAME + ", " + COLUMN_FLH_VALUE
+          + ", " + COLUMN_FLH_NMSPILL + ", " + COLUMN_FLH_VLSPILL + ") VALUES "
+          + "( ?, ?, ?, ?, ?)";
+
+  public static final String STMT_INSERT_HEADER_NAME_SPILL
+      = "INSERT INTO " + TABLE_FL_NMSPILL + " ("
+          + COLUMN_FLN_HEADER + ", " + COLUMN_FLN_SPILL + ") VALUES ( ?, ?)";
+
+  public static final String STMT_INSERT_HEADER_VALUE_SPILL
+      = "INSERT INTO " + TABLE_FL_VLSPILL + " ("
+          + COLUMN_FLV_HEADER + ", " + COLUMN_FLV_SPILL + ") VALUES ( ?, ?)";
+
   private final DataSource dataSource;
 
   protected DerbySchemaHandler(DataSource dataSource) {
@@ -412,4 +441,201 @@ public class DerbySchemaHandler implemen
       }
     }
   }
+
+  @Override
+  public void persistEvent(PersistableEvent pe, Connection connection) {
+    // First populate the main event table
+    byte[] basePayload = pe.getBasePayload();
+    byte[] spillPayload = pe.getSpillPayload();
+    boolean hasSpillPayload = (spillPayload != null);
+    String channelName = pe.getChannelName();
+
+    LOGGER.debug("Preparing insert event: " + pe);
+
+    PreparedStatement baseEventStmt = null;
+    PreparedStatement spillEventStmt = null;
+    PreparedStatement baseHeaderStmt = null;
+    PreparedStatement headerNameSpillStmt = null;
+    PreparedStatement headerValueSpillStmt = null;
+    try {
+      baseEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_BASE,
+                          Statement.RETURN_GENERATED_KEYS);
+      baseEventStmt.setBytes(1, basePayload);
+      baseEventStmt.setString(2, channelName);
+      baseEventStmt.setBoolean(3, hasSpillPayload);
+
+      int baseEventCount = baseEventStmt.executeUpdate();
+      if (baseEventCount != 1) {
+        throw new JdbcChannelException("Invalid update count on base "
+            + "event insert: " + baseEventCount);
+      }
+      // Extract event ID and set it
+      ResultSet eventIdResult = baseEventStmt.getGeneratedKeys();
+
+      if (!eventIdResult.next()) {
+        throw new JdbcChannelException("Unable to retrieive inserted event-id");
+      }
+
+      long eventId = eventIdResult.getLong(1);
+      pe.setEventId(eventId);
+
+      // Persist the payload spill
+      if (hasSpillPayload) {
+         spillEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_SPILL);
+         spillEventStmt.setLong(1, eventId);
+         spillEventStmt.setBinaryStream(2,
+             new ByteArrayInputStream(spillPayload), spillPayload.length);
+         int spillEventCount = spillEventStmt.executeUpdate();
+         if (spillEventCount != 1) {
+           throw new JdbcChannelException("Invalid update count on spill "
+               + "event insert: " + spillEventCount);
+         }
+      }
+
+      // Persist the headers
+      List<HeaderEntry> headers = pe.getHeaderEntries();
+      if (headers != null && headers.size() > 0) {
+        List<HeaderEntry> headerWithNameSpill = new ArrayList<HeaderEntry>();
+        List<HeaderEntry> headerWithValueSpill = new ArrayList<HeaderEntry>();
+
+
+        baseHeaderStmt = connection.prepareStatement(STMT_INSERT_HEADER_BASE,
+                                Statement.RETURN_GENERATED_KEYS);
+        Iterator<HeaderEntry> it = headers.iterator();
+        while (it.hasNext()) {
+          HeaderEntry entry = it.next();
+          SpillableString name = entry.getName();
+          SpillableString value = entry.getValue();
+          baseHeaderStmt.setLong(1, eventId);
+          baseHeaderStmt.setString(2, name.getBase());
+          baseHeaderStmt.setString(3, value.getBase());
+          baseHeaderStmt.setBoolean(4, name.hasSpill());
+          baseHeaderStmt.setBoolean(5, value.hasSpill());
+
+          int updateCount = baseHeaderStmt.executeUpdate();
+          if (updateCount != 1) {
+             throw new JdbcChannelException("Unexpected update header count: "
+                 + updateCount);
+          }
+          ResultSet headerIdResultSet = baseHeaderStmt.getGeneratedKeys();
+          if (!headerIdResultSet.next()) {
+            throw new JdbcChannelException(
+                "Unable to retrieve inserted header id");
+          }
+          long headerId = headerIdResultSet.getLong(1);
+          entry.setId(headerId);
+
+          if (name.hasSpill()) {
+            headerWithNameSpill.add(entry);
+          }
+
+          if (value.hasSpill()) {
+            headerWithValueSpill.add(entry);
+          }
+        }
+
+        // Persist header name spills
+        if (headerWithNameSpill.size() > 0) {
+          LOGGER.debug("Number of headers with name spill: "
+                  + headerWithNameSpill.size());
+
+          headerNameSpillStmt =
+              connection.prepareStatement(STMT_INSERT_HEADER_NAME_SPILL);
+
+          for (HeaderEntry entry : headerWithNameSpill) {
+            String nameSpill = entry.getName().getSpill();
+
+            headerNameSpillStmt.setLong(1, entry.getId());
+            headerNameSpillStmt.setString(2, nameSpill);
+            headerNameSpillStmt.addBatch();
+          }
+
+          int[] nameSpillUpdateCount = headerNameSpillStmt.executeBatch();
+          if (nameSpillUpdateCount.length != headerWithNameSpill.size()) {
+            throw new JdbcChannelException("Unexpected update count for header "
+                + "name spills: expected " + headerWithNameSpill.size() + ", "
+                + "found " + nameSpillUpdateCount.length);
+          }
+
+          for (int i = 0; i < nameSpillUpdateCount.length; i++) {
+            if (nameSpillUpdateCount[i] != 1) {
+              throw new JdbcChannelException("Unexpected update count for "
+                  + "header name spill at position " + i + ", value: "
+                  + nameSpillUpdateCount[i]);
+            }
+          }
+        }
+
+        // Persist header value spills
+        if (headerWithValueSpill.size() > 0) {
+          LOGGER.debug("Number of headers with value spill: "
+              + headerWithValueSpill.size());
+
+          headerValueSpillStmt =
+              connection.prepareStatement(STMT_INSERT_HEADER_VALUE_SPILL);
+
+          for(HeaderEntry entry : headerWithValueSpill) {
+            String valueSpill = entry.getValue().getSpill();
+
+            headerValueSpillStmt.setLong(1, entry.getId());
+            headerValueSpillStmt.setString(2, valueSpill);
+            headerValueSpillStmt.addBatch();
+          }
+
+          int[] valueSpillUpdateCount = headerValueSpillStmt.executeBatch();
+          if (valueSpillUpdateCount.length != headerWithValueSpill.size()) {
+            throw new JdbcChannelException("Unexpected update count for header "
+                + "value spills: expected " + headerWithValueSpill.size() + ", "
+                + "found " + valueSpillUpdateCount.length);
+          }
+
+          for (int i = 0; i < valueSpillUpdateCount.length; i++) {
+            if (valueSpillUpdateCount[i] != 1) {
+              throw new JdbcChannelException("Unexpected update count for "
+                  + "header value spill at position " + i + ", value: "
+                  + valueSpillUpdateCount[i]);
+            }
+          }
+        }
+      }
+    } catch (SQLException ex) {
+      throw new JdbcChannelException("Unable to persist event: " + pe, ex);
+    } finally {
+      if (baseEventStmt != null) {
+        try {
+          baseEventStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close base event statement", ex);
+        }
+      }
+      if (spillEventStmt != null) {
+        try {
+          spillEventStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close spill event statement", ex);
+        }
+      }
+      if (baseHeaderStmt != null) {
+        try {
+          baseHeaderStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close base header statement", ex);
+        }
+      }
+      if (headerNameSpillStmt != null) {
+        try {
+          headerNameSpillStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close header name spill statement", ex);
+        }
+      }
+      if (headerValueSpillStmt != null) {
+        try {
+          headerValueSpillStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close header value spill statement", ex);
+        }
+      }
+    }
+  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Fri Oct 14 09:13:39 2011
@@ -170,14 +170,15 @@ public class JdbcChannelProviderImpl imp
   }
 
   @Override
-  public void persistEvent(String channelName, Event event) {
-    PersistableEvent persistableEvent = new PersistableEvent(event);
-    Transaction tx = null;
+  public void persistEvent(String channel, Event event) {
+    PersistableEvent persistableEvent = new PersistableEvent(channel, event);
+    JdbcTransactionImpl tx = null;
     try {
       tx = getTransaction();
       tx.begin();
 
       // Persist the persistableEvent
+      schemaHandler.persistEvent(persistableEvent, tx.getConnection());
 
       tx.commit();
     } catch (Exception ex) {
@@ -197,7 +198,7 @@ public class JdbcChannelProviderImpl imp
   }
 
   @Override
-  public Transaction getTransaction() {
+  public JdbcTransactionImpl getTransaction() {
     return txFactory.get();
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java Fri Oct 14 09:13:39 2011
@@ -154,4 +154,11 @@ public class JdbcTransactionImpl impleme
       }
     }
   }
+
+  protected Connection getConnection() {
+    if (!active) {
+      throw new JdbcChannelException("Inactive transaction");
+    }
+    return connection;
+  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java Fri Oct 14 09:13:39 2011
@@ -1,5 +1,7 @@
 package org.apache.flume.channel.jdbc.impl;
 
+import java.sql.Connection;
+
 import javax.sql.DataSource;
 
 public class MySQLSchemaHandler implements SchemaHandler {
@@ -28,4 +30,10 @@ public class MySQLSchemaHandler implemen
 
   }
 
+  @Override
+  public void persistEvent(PersistableEvent pe, Connection connection) {
+    // TODO Auto-generated method stub
+
+  }
+
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java Fri Oct 14 09:13:39 2011
@@ -29,20 +29,22 @@ import org.apache.flume.channel.jdbc.Con
 public class PersistableEvent {
 
   private long eventId;
-  private byte[] payload;
-  private byte[] spill;
+  private final String channel;
+  private byte[] basePayload;
+  private byte[] spillPayload;
   private List<HeaderEntry> headers;
 
-  public PersistableEvent(Event event) {
+  public PersistableEvent(String channel, Event event) {
+    this.channel = channel;
 
     byte[] givenPayload = event.getBody();
     if (givenPayload.length < ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD) {
-      payload = Arrays.copyOf(givenPayload, givenPayload.length);
-      spill = null;
+      basePayload = Arrays.copyOf(givenPayload, givenPayload.length);
+      spillPayload = null;
     } else {
-      payload = Arrays.copyOfRange(givenPayload, 0,
+      basePayload = Arrays.copyOfRange(givenPayload, 0,
           ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD);
-      spill = Arrays.copyOfRange(givenPayload,
+      spillPayload = Arrays.copyOfRange(givenPayload,
           ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD, givenPayload.length);
     }
 
@@ -59,12 +61,13 @@ public class PersistableEvent {
 
   public byte[] getPayload() {
     byte[] result = null;
-    if (spill == null) {
-      result = Arrays.copyOf(payload, payload.length);
+    if (spillPayload == null) {
+      result = Arrays.copyOf(basePayload, basePayload.length);
     } else {
-      result = new byte[payload.length + spill.length];
-      System.arraycopy(payload, 0, result, 0, payload.length);
-      System.arraycopy(spill, 0, result, payload.length, spill.length);
+      result = new byte[basePayload.length + spillPayload.length];
+      System.arraycopy(basePayload, 0, result, 0, basePayload.length);
+      System.arraycopy(spillPayload, 0, result,
+          basePayload.length, spillPayload.length);
     }
 
     return result;
@@ -75,35 +78,73 @@ public class PersistableEvent {
     if (headers != null) {
       headerMap =  new HashMap<String, String>();
       for (HeaderEntry entry :  headers) {
-        headerMap.put(entry.getName(), entry.getValue());
+        headerMap.put(entry.getNameString(), entry.getValueString());
       }
     }
 
     return headerMap;
   }
 
-  public static class HeaderEntry {
+  public String getChannelName() {
+    return channel;
+  }
+
+  public byte[] getBasePayload() {
+    return this.basePayload;
+  }
 
-    private SpillableString nameString;
-    private SpillableString valueString;
+  public byte[] getSpillPayload() {
+    return this.spillPayload;
+  }
+
+  protected void setEventId(long eventId) {
+    this.eventId = eventId;
+  }
+
+  public List<HeaderEntry> getHeaderEntries() {
+    return headers;
+  }
+
+  protected static class HeaderEntry {
+
+    private long headerId = -1L;
+    private SpillableString name;
+    private SpillableString value;
 
     public HeaderEntry(String name, String value) {
-      nameString = new SpillableString(name,
+      this.name = new SpillableString(name,
           ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD);
-      valueString = new SpillableString(value,
+      this.value = new SpillableString(value,
           ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD);
     }
 
-    public String getName() {
-      return nameString.getString();
+    public String getNameString() {
+      return name.getString();
     }
 
-    public String getValue() {
-      return valueString.getString();
+    public SpillableString getName() {
+      return name;
     }
+
+    public String getValueString() {
+      return value.getString();
+    }
+
+    public SpillableString getValue() {
+      return value;
+    }
+
+    protected void setId(long headerId) {
+      this.headerId = headerId;
+    }
+
+    public long getId() {
+      return headerId;
+    }
+
   }
 
-  private static class SpillableString {
+  protected static class SpillableString {
 
     private String base;
     private String spill;
@@ -144,5 +185,9 @@ public class PersistableEvent {
       }
       return base + spill;
     }
+
+    public boolean hasSpill() {
+      return spill != null;
+    }
   }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java Fri Oct 14 09:13:39 2011
@@ -17,7 +17,7 @@
  */
 package org.apache.flume.channel.jdbc.impl;
 
-import javax.sql.DataSource;
+import java.sql.Connection;
 
 /**
  * <p>A handler for creating and validating database schema for use by
@@ -42,4 +42,14 @@ public interface SchemaHandler {
    * @param connection the connection to create schema for.
    */
   public void createSchemaObjects();
+
+  /**
+   * Inserts the given persistent event into the database. The connection that
+   * is passed into the handler has an ongoing transaction and therefore the
+   * SchemaHandler implementation must not close the connection.
+   *
+   * @param pe the event to persist
+   * @param connection the connection to use
+   */
+  public void persistEvent(PersistableEvent pe, Connection connection);
 }

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java?rev=1183252&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java Fri Oct 14 09:13:39 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.Map;
+
+import org.apache.flume.Event;
+
+public class MockEvent implements Event {
+
+  private final byte[] payload;
+  private final Map<String, String> headers;
+
+  public MockEvent(byte[] payload, Map<String, String> headers) {
+    this.payload = payload;
+    this.headers = headers;
+  }
+
+  @Override
+  public Map<String, String> getHeaders() {
+    return headers;
+  }
+
+  @Override
+  public void setHeaders(Map<String, String> headers) {
+
+  }
+
+  @Override
+  public byte[] getBody() {
+    return payload;
+  }
+
+  @Override
+  public void setBody(byte[] body) {
+
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java?rev=1183252&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java Fri Oct 14 09:13:39 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.Random;
+
+public final class MockEventUtils {
+
+  private static final Random RANDOM = new Random(System.currentTimeMillis());
+
+  private static final String[] CHARS = new String[] {
+    "a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r",
+    "s","t","u","v","w","x","y","z",
+    "A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R",
+    "S","T","U","V","W","X","Y","Z",
+    "0","1","2","3","4","5","6","7","8","9",
+    "!","@","#","$","%","^","&","*","(",")",
+    "[","]","{","}",":",";","\"","'",",",".","<",">","?","/","\\","|",
+  };
+
+  public static byte[] generatePayload(int size) {
+    byte[] result = new byte[size];
+    RANDOM.nextBytes(result);
+    return result;
+  }
+
+  public static String generateHeaderString(int size) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < size; i++) {
+      int x = Math.abs(RANDOM.nextInt());
+      int y = x % CHARS.length;
+      sb.append(CHARS[y]);
+    }
+    return sb.toString();
+  }
+
+  private MockEventUtils() {
+    // Disable explicit object creation
+  }
+
+
+}

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Fri Oct 14 09:13:39 2011
@@ -1,11 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flume.channel.jdbc;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import junit.framework.Assert;
 
+import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
 import org.junit.After;
@@ -80,6 +100,36 @@ public class TestJdbcChannelProvider {
     provider = null;
   }
 
+  @Test
+  public void testPeristingEvents() {
+    provider = new JdbcChannelProviderImpl();
+    provider.initialize(derbyProps);
+
+    int nameLimit = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
+    int th = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
+
+    byte[] s1 = MockEventUtils.generatePayload(th - 1);
+    Map<String, String> m1 = new HashMap<String, String>();
+    m1.put(MockEventUtils.generateHeaderString(1), "one");
+    m1.put(MockEventUtils.generateHeaderString(2), "twotwo");
+    m1.put(MockEventUtils.generateHeaderString(3), "three");
+    m1.put(MockEventUtils.generateHeaderString(100), "ahundred");
+    m1.put(MockEventUtils.generateHeaderString(nameLimit - 21), "w");
+    m1.put(MockEventUtils.generateHeaderString(nameLimit - 2), "x");
+    m1.put(MockEventUtils.generateHeaderString(nameLimit - 1), "y");
+    m1.put(MockEventUtils.generateHeaderString(nameLimit), "z");
+    m1.put(MockEventUtils.generateHeaderString(nameLimit + 1), "a");
+    m1.put(MockEventUtils.generateHeaderString(nameLimit + 2), "b");
+    m1.put(MockEventUtils.generateHeaderString(nameLimit + 21), "c");
+
+    Event event = new MockEvent(s1, m1);
+
+    provider.persistEvent("test", event);
+
+    provider.close();
+    provider = null;
+  }
+
   @After
   public void tearDown() throws IOException {
     if (provider != null) {

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java Fri Oct 14 09:13:39 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flume.channel.jdbc;
 
 import java.util.HashMap;
@@ -15,105 +32,80 @@ public class TestPersistentEvent {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(TestPersistentEvent.class);
 
-  private static final String[] CHARS = new String[] {
-    "a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r",
-    "s","t","u","v","w","x","y","z",
-    "A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R",
-    "S","T","U","V","W","X","Y","Z",
-    "0","1","2","3","4","5","6","7","8","9",
-    "!","@","#","$","%","^","&","*","(",")",
-    "[","]","{","}",":",";","\"","'",",",".","<",">","?","/","\\","|",
-  };
+
 
   @Test
   public void testMarshalling() {
-    Random rnd = new Random(System.currentTimeMillis());
 
     int nameLimit = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
     int valLimit = ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD;
 
-    byte[] s1 = new byte[1];
-    rnd.nextBytes(s1);
+    byte[] s1 = MockEventUtils.generatePayload(1);
     runTest(s1, null);
 
-    byte[] s2 = new byte[2];
-    rnd.nextBytes(s2);
+    byte[] s2 = MockEventUtils.generatePayload(2);
     runTest(s2, new HashMap<String, String>());
 
-    byte[] s3 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD - 2];
-    rnd.nextBytes(s3);
+    int th = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
+
+    byte[] s3 = MockEventUtils.generatePayload(th - 2);
     Map<String, String> m3 = new HashMap<String, String>();
-    m3.put(generateString(rnd, 1), generateString(rnd, 1));
+    m3.put(MockEventUtils.generateHeaderString(1),
+        MockEventUtils.generateHeaderString(1));
     runTest(s3, m3);
 
-    byte[] s4 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD - 1];
-    rnd.nextBytes(s4);
+    byte[] s4 = MockEventUtils.generatePayload(th - 1);
     Map<String, String> m4 = new HashMap<String, String>();
-    m4.put(generateString(rnd, nameLimit - 21), "w");
-    m4.put(generateString(rnd, nameLimit - 2), "x");
-    m4.put(generateString(rnd, nameLimit - 1), "y");
-    m4.put(generateString(rnd, nameLimit), "z");
-    m4.put(generateString(rnd, nameLimit + 1), "a");
-    m4.put(generateString(rnd, nameLimit + 2), "b");
-    m4.put(generateString(rnd, nameLimit + 21), "c");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit - 21), "w");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit - 2), "x");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit - 1), "y");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit), "z");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit + 1), "a");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit + 2), "b");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit + 21), "c");
     runTest(s4, m4);
 
-    byte[] s5 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD];
-    rnd.nextBytes(s5);
+    byte[] s5 = MockEventUtils.generatePayload(th);
     Map<String, String> m5 = new HashMap<String, String>();
-    m5.put("w", generateString(rnd, valLimit - 21));
-    m5.put("x", generateString(rnd, valLimit - 2));
-    m5.put("y", generateString(rnd, valLimit - 1));
-    m5.put("z", generateString(rnd, valLimit));
-    m5.put("a", generateString(rnd, valLimit + 1));
-    m5.put("b", generateString(rnd, valLimit + 2));
-    m5.put("c", generateString(rnd, valLimit + 21));
+    m5.put("w", MockEventUtils.generateHeaderString(valLimit - 21));
+    m5.put("x", MockEventUtils.generateHeaderString(valLimit - 2));
+    m5.put("y", MockEventUtils.generateHeaderString(valLimit - 1));
+    m5.put("z", MockEventUtils.generateHeaderString(valLimit));
+    m5.put("a", MockEventUtils.generateHeaderString(valLimit + 1));
+    m5.put("b", MockEventUtils.generateHeaderString(valLimit + 2));
+    m5.put("c", MockEventUtils.generateHeaderString(valLimit + 21));
     runTest(s5, m5);
 
-    byte[] s6 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 1];
-    rnd.nextBytes(s6);
+    byte[] s6 = MockEventUtils.generatePayload(th + 1);
     Map<String, String> m6 = new HashMap<String, String>();
-    m6.put(generateString(rnd, nameLimit - 21),
-           generateString(rnd, valLimit - 21));
-    m6.put(generateString(rnd, nameLimit - 2),
-        generateString(rnd, valLimit - 2));
-    m6.put(generateString(rnd, nameLimit - 1),
-        generateString(rnd, valLimit - 1));
-    m6.put(generateString(rnd, nameLimit),
-        generateString(rnd, valLimit));
-    m6.put(generateString(rnd, nameLimit + 1),
-        generateString(rnd, valLimit + 1));
-    m6.put(generateString(rnd, nameLimit + 2),
-        generateString(rnd, valLimit + 2));
-    m6.put(generateString(rnd, nameLimit + 21),
-        generateString(rnd, valLimit + 21));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit - 21),
+           MockEventUtils.generateHeaderString(valLimit - 21));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit - 2),
+           MockEventUtils.generateHeaderString(valLimit - 2));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit - 1),
+           MockEventUtils.generateHeaderString(valLimit - 1));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit),
+           MockEventUtils.generateHeaderString(valLimit));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit + 1),
+           MockEventUtils.generateHeaderString(valLimit + 1));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit + 2),
+           MockEventUtils.generateHeaderString(valLimit + 2));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit + 21),
+           MockEventUtils.generateHeaderString(valLimit + 21));
 
     runTest(s6, m6);
 
-    byte[] s7 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 2];
-    rnd.nextBytes(s7);
+    byte[] s7 = MockEventUtils.generatePayload(th + 2);
     runTest(s7, null);
 
-    byte[] s8 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 27];
-    rnd.nextBytes(s8);
+    byte[] s8 = MockEventUtils.generatePayload(th + 27);
     runTest(s8, null);
   }
 
-  private String generateString(Random rnd, int size) {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < size; i++) {
-      int x = Math.abs(rnd.nextInt());
-      int y = x % CHARS.length;
-      sb.append(CHARS[y]);
-    }
-    System.out.println("String: " + sb);
-    return sb.toString();
-  }
-
-
 
   private void runTest(byte[] payload, Map<String, String> headers) {
-    PersistableEvent pe = new PersistableEvent(new MockEvent(payload, headers));
+    PersistableEvent pe = new PersistableEvent("test",
+        new MockEvent(payload, headers));
     Assert.assertArrayEquals(payload, pe.getPayload());
     Map<String, String> h = pe.getHeaders();
     if (h == null) {
@@ -129,38 +121,4 @@ public class TestPersistentEvent {
       Assert.assertTrue(headers.size() == 0);
     }
   }
-
-
-
-  private static class MockEvent implements Event {
-
-    private final byte[] payload;
-    private final Map<String, String> headers;
-
-    private MockEvent(byte[] payload, Map<String, String> headers) {
-      this.payload = payload;
-      this.headers = headers;
-    }
-
-    @Override
-    public Map<String, String> getHeaders() {
-      return headers;
-    }
-
-    @Override
-    public void setHeaders(Map<String, String> headers) {
-
-    }
-
-    @Override
-    public byte[] getBody() {
-      return payload;
-    }
-
-    @Override
-    public void setBody(byte[] body) {
-
-    }
-
-  }
 }