You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/13 10:00:59 UTC

svn commit: r1182709 - in /incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src: main/java/org/apache/flume/channel/jdbc/ main/java/org/apache/flume/channel/jdbc/impl/ test/java/org/apache/flume/channel/jdbc/

Author: arvind
Date: Thu Oct 13 08:00:59 2011
New Revision: 1182709

URL: http://svn.apache.org/viewvc?rev=1182709&view=rev
Log:
Work related to JDBC channel implementation

Added:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java Thu Oct 13 08:00:59 2011
@@ -51,6 +51,43 @@ public final class ConfigurationConstant
   public static final String CONFIG_MAX_CONNECTION =
       PREFIX + "maximum.connections";
 
+  // Built in constants for JDBC Channel implementation
+
+  /**
+   * The length for payload bytes that will stored inline. Payloads larger
+   * than this length will spill into BLOB.
+   */
+  public static int PAYLOAD_LENGTH_THRESHOLD = 16384; // 16kb
+
+  /**
+   * The length of header name in bytes that will be stored inline. Header
+   * names longer than this number will spill over into CLOB.
+   */
+  public static int HEADER_NAME_LENGTH_THRESHOLD = 251;
+
+  /**
+   * The length of header value in bytes that will be stored inline. Header
+   * values longer than this number will spill over into CLOB.
+   */
+  public static int HEADER_VALUE_LENGTH_THRESHOLD = 251;
+
+  /**
+   * The maximum length of channel name.
+   */
+  public static int CHANNEL_NAME_MAX_LENGTH = 64;
+
+  /**
+   * The maximum spill size for header names. Together with the value of
+   * HEADER_NAME_LENGTH_THRESHOLD this adds up to 32kb.
+   */
+  public static int HEADER_NAME_SPILL_MAX_LENGTH = 32517;
+
+  /**
+   * The maximum spill size for header values. Together with the value of
+   * HEADER_VALUE_LENGTH_THRESHOLD, this adds up to 32kb.
+   */
+  public static int HEADER_VALUE_SPILL_MAX_LENGTH = 32517;
+
   private ConfigurationConstants() {
     // Disable object creation
   }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java Thu Oct 13 08:00:59 2011
@@ -27,10 +27,90 @@ import java.util.Set;
 
 import javax.sql.DataSource;
 
+import org.apache.flume.channel.jdbc.ConfigurationConstants;
 import org.apache.flume.channel.jdbc.JdbcChannelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * <p>
+ * Schema handler for Derby Database. This handler uses the following schema:
+ * </p>
+ *
+ * <p><strong><tt>FL_EVENT</tt></strong>: The main event table. This table
+ * contains an auto-generated event ID along with the first 16kb of payload
+ * data. If the payload is larger than 16kb, a spill indicator flag is set and
+ * the remaining data is recorded in the <tt>FL_PLSPILL</tt> table.</p>
+ * <pre>
+ * +-------------------------------+
+ * | FL_EVENT                      |
+ * +-------------------------------+
+ * | FLE_ID     : BIGINT PK        | (auto-gen sequence)
+ * | FLE_PAYLOAD: VARBINARY(16384) | (16kb payload)
+ * | FLE_SPILL  : BOOLEAN          | (true if payload spills)
+ * | FLE_CHANNEL: VARCHAR(64)      |
+ * +-------------------------------+
+ * </pre>
+ *
+ * <p><strong><tt>FL_PLSPILL</tt></strong>: This table holds payloads in excess
+ * of 16kb and relates back to the <tt>FL_EVENT</tt> table using foreign key
+ * reference via <tt>FLP_EVENT</tt> column.</p>
+ * <pre>
+ * +---------------------+
+ * | FL_PLSPILL          |
+ * +---------------------+
+ * | FLP_EVENT  : BIGINT | (FK into FL_EVENT.FLE_ID)
+ * | FLP_SPILL  : BLOB   |
+ * +---------------------+
+ * </pre>
+ * <p><strong><tt>FL_HEADER</tt></strong>: The table that holds headers. This
+ * table contains name value pairs of headers less than or up to first 255
+ * bytes each. If a name is longer than 255 bytes, a spill indicator flag is
+ * set and the remaining bytes are recorded in <tt>FL_NMSPILL</tt> table.
+ * Similarly if the value is longer than 255 bytes, a spill indicator flag is
+ * set and the remaining bytes are recorded in <tt>FL_VLSPILL</tt> table. Each
+ * header record relates back to the <tt>FL_EVENT</tt> table using foreign key
+ * reference via <tt>FLH_EVENT</tt> column.</p>
+ * <pre>
+ * +--------------------------+
+ * | FL_HEADER                |
+ * +--------------------------+
+ * | FLH_ID     : BIGINT PK   | (auto-gen sequence)
+ * | FLH_EVENT  : BIGINT      | (FK into FL_EVENT.FLE_ID)
+ * | FLH_NAME   : VARCHAR(251)|
+ * | FLH_VALUE  : VARCHAR(251)|
+ * | FLH_NMSPILL: BOOLEAN     | (true if name spills)
+ * | FLH_VLSPILL: BOOLEAN     | (true if value spills)
+ * +--------------------------+
+ * </pre>
+ * <p><strong><tt>FL_NMSPILL</tt></strong>: The table that holds header names
+ * in excess of 255 bytes and relates back to the <tt>FL_HEADER</tt> table
+ * using foreign key reference via <tt>FLN_HEADER</tt> column.</p>
+ * <pre>
+ * +----------------------+
+ * | FL_NMSPILL           |
+ * +----------------------+
+ * | FLN_HEADER  : BIGINT | (FK into FL_HEADER.FLH_ID)
+ * | FLN_SPILL   : CLOB   |
+ * +----------------------+
+ * </pre>
+ * <p><strong><tt>FL_VLSPILL</tt></strong>: The table that holds header values
+ * in excess of 255 bytes and relates back to the <tt>FL_HEADER</tt> table
+ * using foreign key reference via <tt>FLV_HEADER</tt> column.</p>
+ * <pre>
+ * +----------------------+
+ * | FL_VLSPILL           |
+ * +----------------------+
+ * | FLV_HEADER  : BIGINT | (FK into FL_HEADER.FLH_ID)
+ * | FLV_SPILL   : CLOB   |
+ * +----------------------+
+ * </pre>
+ * </p>
+ * <p><strong>NOTE</strong>: The values that decide the spill boundary
+ * and storage length limits are defined in <tt>ConfigurationConstants</tt>
+ * class.</p>
+ * @see org.apache.flume.channel.jdbc.ConfigurationConstants
+ */
 public class DerbySchemaHandler implements SchemaHandler {
 
   private static final Logger LOGGER =
@@ -48,20 +128,35 @@ public class DerbySchemaHandler implemen
   private static final String COLUMN_FLE_ID = "FLE_ID";
   private static final String COLUMN_FLE_PAYLOAD = "FLE_PAYLOAD";
   private static final String COLUMN_FLE_CHANNEL = "FLE_CHANNEL";
+  private static final String COLUMN_FLE_SPILL = "FLE_SPILL";
 
-  private static final String TABLE_FL_PLEXT_NAME = "FL_PLEXT";
-  private static final String TABLE_FL_PLEXT = SCHEMA_FLUME + "."
-      + TABLE_FL_PLEXT_NAME;
-  private static final String COLUMN_FLP_EVENTID = "FLP_EVENTID";
+  private static final String TABLE_FL_PLSPILL_NAME = "FL_PLSPILL";
+  private static final String TABLE_FL_PLSPILL = SCHEMA_FLUME + "."
+      + TABLE_FL_PLSPILL_NAME;
+  private static final String COLUMN_FLP_EVENT = "FLP_EVENT";
   private static final String COLUMN_FLP_SPILL = "FLP_SPILL";
 
   private static final String TABLE_FL_HEADER_NAME = "FL_HEADER";
   private static final String TABLE_FL_HEADER = SCHEMA_FLUME + "."
       + TABLE_FL_HEADER_NAME;
-  private static final String COLUMN_FLH_EVENTID = "FLH_EVENTID";
+  private static final String COLUMN_FLH_ID = "FLH_ID";
+  private static final String COLUMN_FLH_EVENT = "FLH_EVENT";
   private static final String COLUMN_FLH_NAME = "FLH_NAME";
   private static final String COLUMN_FLH_VALUE = "FLH_VALUE";
+  private static final String COLUMN_FLH_NMSPILL = "FLH_NMSPILL";
+  private static final String COLUMN_FLH_VLSPILL = "FLH_VLSPILL";
 
+  private static final String TABLE_FL_NMSPILL_NAME = "FL_NMSPILL";
+  private static final String TABLE_FL_NMSPILL = SCHEMA_FLUME + "."
+      + TABLE_FL_NMSPILL_NAME;
+  private static final String COLUMN_FLN_HEADER = "FLN_HEADER";
+  private static final String COLUMN_FLN_SPILL = "FLN_SPILL";
+
+  private static final String TABLE_FL_VLSPILL_NAME = "FL_VLSPILL";
+  private static final String TABLE_FL_VLSPILL = SCHEMA_FLUME + "."
+      + TABLE_FL_VLSPILL_NAME;
+  private static final String COLUMN_FLV_HEADER = "FLV_HEADER";
+  private static final String COLUMN_FLV_SPILL = "FLV_SPILL";
 
   public static final String QUERY_CREATE_SCHEMA_FLUME
       = "CREATE SCHEMA " + SCHEMA_FLUME;
@@ -69,25 +164,51 @@ public class DerbySchemaHandler implemen
   public static final String QUERY_CREATE_TABLE_FL_EVENT
       = "CREATE TABLE " + TABLE_FL_EVENT + " ( "
         + COLUMN_FLE_ID + " BIGINT GENERATED ALWAYS AS IDENTITY "
-        + "(START WITH 2, INCREMENT BY 1) PRIMARY KEY, "
-        + COLUMN_FLE_PAYLOAD + " VARCHAR(16384) FOR BIT DATA, " // 16kb
-        + COLUMN_FLE_CHANNEL + " VARCHAR(32))";
-
-  public static final String QUERY_CREATE_TABLE_FL_PLEXT
-      = "CREATE TABLE " + TABLE_FL_PLEXT + " ( "
-        + COLUMN_FLP_EVENTID + " BIGINT, "
+        + "(START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+        + COLUMN_FLE_PAYLOAD + " VARCHAR("
+        + ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD
+        + ") FOR BIT DATA, "
+        + COLUMN_FLE_CHANNEL + " VARCHAR("
+        + ConfigurationConstants.CHANNEL_NAME_MAX_LENGTH + "), "
+        + COLUMN_FLE_SPILL + " BOOLEAN)";
+
+  public static final String QUERY_CREATE_TABLE_FL_PLSPILL
+      = "CREATE TABLE " + TABLE_FL_PLSPILL + " ( "
+        + COLUMN_FLP_EVENT + " BIGINT, "
         + COLUMN_FLP_SPILL + " BLOB, "
-        + "FOREIGN KEY (" + COLUMN_FLP_EVENTID + ") REFERENCES "
+        + "FOREIGN KEY (" + COLUMN_FLP_EVENT + ") REFERENCES "
         + TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + "))";
 
   public static final String QUERY_CREATE_TABLE_FL_HEADER
       = "CREATE TABLE " + TABLE_FL_HEADER + " ( "
-        + COLUMN_FLH_EVENTID + " BIGINT, "
-        + COLUMN_FLH_NAME + " VARCHAR(255), "
-        + COLUMN_FLH_VALUE + " VARCHAR(255), "
-        + "FOREIGN KEY (" + COLUMN_FLH_EVENTID + ") REFERENCES "
+        + COLUMN_FLH_ID + " BIGINT GENERATED ALWAYS AS IDENTITY "
+        + "(START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+        + COLUMN_FLH_EVENT + " BIGINT, "
+        + COLUMN_FLH_NAME + " VARCHAR("
+        + ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD + "), "
+        + COLUMN_FLH_VALUE + " VARCHAR("
+        + ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD + "), "
+        + COLUMN_FLH_NMSPILL + " BOOLEAN, "
+        + COLUMN_FLH_VLSPILL + " BOOLEAN, "
+        + "FOREIGN KEY (" + COLUMN_FLH_EVENT + ") REFERENCES "
         + TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + "))";
 
+  public static final String QUERY_CREATE_TABLE_FL_NMSPILL
+      = "CREATE TABLE " + TABLE_FL_NMSPILL + " ( "
+        + COLUMN_FLN_HEADER + " BIGINT, "
+        + COLUMN_FLN_SPILL + " VARCHAR("
+        + ConfigurationConstants.HEADER_NAME_SPILL_MAX_LENGTH + "), "
+        + "FOREIGN KEY (" + COLUMN_FLN_HEADER + ") REFERENCES "
+        + TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + "))";
+
+  public static final String QUERY_CREATE_TABLE_FL_VLSPILL
+      = "CREATE TABLE " + TABLE_FL_VLSPILL + " ( "
+        + COLUMN_FLV_HEADER + " BIGINT, "
+        + COLUMN_FLV_SPILL + " VARCHAR("
+        + ConfigurationConstants.HEADER_VALUE_SPILL_MAX_LENGTH + "), "
+        + "FOREIGN KEY (" + COLUMN_FLV_HEADER + ") REFERENCES "
+        + TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + "))";
+
   public static final String COLUMN_LOOKUP_QUERY =
       "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
          + "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
@@ -100,7 +221,6 @@ public class DerbySchemaHandler implemen
     this.dataSource = dataSource;
   }
 
-
   @Override
   public boolean schemaExists() {
     Connection connection = null;
@@ -149,20 +269,30 @@ public class DerbySchemaHandler implemen
   public void createSchemaObjects() {
     runQuery(QUERY_CREATE_SCHEMA_FLUME);
     runQuery(QUERY_CREATE_TABLE_FL_EVENT);
-    runQuery(QUERY_CREATE_TABLE_FL_PLEXT);
+    runQuery(QUERY_CREATE_TABLE_FL_PLSPILL);
     runQuery(QUERY_CREATE_TABLE_FL_HEADER);
+    runQuery(QUERY_CREATE_TABLE_FL_NMSPILL);
+    runQuery(QUERY_CREATE_TABLE_FL_VLSPILL);
   }
 
   @Override
   public void validateSchema() {
     verifyTableStructure(SCHEMA_FLUME, TABLE_FL_EVENT_NAME,
-        COLUMN_FLE_ID, COLUMN_FLE_PAYLOAD, COLUMN_FLE_CHANNEL);
+        COLUMN_FLE_ID, COLUMN_FLE_PAYLOAD, COLUMN_FLE_CHANNEL,
+        COLUMN_FLE_SPILL);
 
-    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_PLEXT_NAME,
-        COLUMN_FLP_EVENTID, COLUMN_FLP_SPILL);
+    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_PLSPILL_NAME,
+        COLUMN_FLP_EVENT, COLUMN_FLP_SPILL);
 
     verifyTableStructure(SCHEMA_FLUME, TABLE_FL_HEADER_NAME,
-        COLUMN_FLH_EVENTID, COLUMN_FLH_NAME, COLUMN_FLH_VALUE);
+        COLUMN_FLH_ID, COLUMN_FLH_EVENT, COLUMN_FLH_NAME, COLUMN_FLH_VALUE,
+        COLUMN_FLH_NMSPILL, COLUMN_FLH_VLSPILL);
+
+    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_NMSPILL_NAME,
+        COLUMN_FLN_HEADER, COLUMN_FLN_SPILL);
+
+    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_VLSPILL_NAME,
+        COLUMN_FLV_HEADER, COLUMN_FLV_SPILL);
   }
 
   private void verifyTableStructure(String schemaName, String tableName,

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Thu Oct 13 08:00:59 2011
@@ -171,8 +171,23 @@ public class JdbcChannelProviderImpl imp
 
   @Override
   public void persistEvent(String channelName, Event event) {
-    // TODO Auto-generated method stub
+    PersistableEvent persistableEvent = new PersistableEvent(event);
+    Transaction tx = null;
+    try {
+      tx = getTransaction();
+      tx.begin();
+
+      // Persist the persistableEvent
 
+      tx.commit();
+    } catch (Exception ex) {
+      tx.rollback();
+      throw new JdbcChannelException("Failed to persist event", ex);
+    } finally {
+      if (tx != null) {
+        tx.close();
+      }
+    }
   }
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java Thu Oct 13 08:00:59 2011
@@ -34,10 +34,6 @@ public class JdbcTransactionFactory exte
 
   @Override
   protected JdbcTransactionImpl initialValue() {
-    try {
-      return new JdbcTransactionImpl(dataSource.getConnection(), this);
-    } catch (SQLException ex) {
-      throw new JdbcChannelException("Unable to create connection", ex);
-    }
+    return new JdbcTransactionImpl(dataSource, this);
   }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java Thu Oct 13 08:00:59 2011
@@ -21,6 +21,8 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.SQLWarning;
 
+import javax.sql.DataSource;
+
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.jdbc.JdbcChannelException;
 import org.slf4j.Logger;
@@ -32,6 +34,7 @@ public class JdbcTransactionImpl impleme
   private static final Logger LOGGER =
       LoggerFactory.getLogger(JdbcTransactionImpl.class);
 
+  private final DataSource dataSource;
   private Connection connection;
   private JdbcTransactionFactory txFactory;
   boolean active = true;
@@ -39,16 +42,10 @@ public class JdbcTransactionImpl impleme
 
   boolean rollback = false;
 
-  protected JdbcTransactionImpl(Connection conn,
+  protected JdbcTransactionImpl(DataSource dataSource,
       JdbcTransactionFactory factory) {
-    connection = conn;
+    this.dataSource = dataSource;
     txFactory = factory;
-
-    try {
-      connection.clearWarnings();
-    } catch (SQLException ex) {
-      LOGGER.error("Error while clearing warnings: " + ex.getErrorCode(), ex);
-    }
   }
 
   @Override
@@ -56,6 +53,20 @@ public class JdbcTransactionImpl impleme
     if (!active) {
       throw new JdbcChannelException("Inactive transaction");
     }
+    if (count == 0) {
+      // Lease a connection now
+      try {
+        connection = dataSource.getConnection();
+      } catch (SQLException ex) {
+        throw new JdbcChannelException("Unable to lease connection", ex);
+      }
+      // Clear any prior warnings on the connection
+      try {
+        connection.clearWarnings();
+      } catch (SQLException ex) {
+        LOGGER.error("Error while clearing warnings: " + ex.getErrorCode(), ex);
+      }
+    }
     count++;
     LOGGER.debug("Tx count-begin: " + count + ", rollback: " + rollback);
   }
@@ -77,6 +88,7 @@ public class JdbcTransactionImpl impleme
     if (!active) {
       throw new JdbcChannelException("Inactive transaction");
     }
+    LOGGER.warn("Marking transaction for rollback");
     rollback = true;
     LOGGER.debug("Tx count-rollback: " + count + ", rollback: " + rollback);
   }

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java?rev=1182709&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java Thu Oct 13 08:00:59 2011
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Event;
+import org.apache.flume.channel.jdbc.ConfigurationConstants;
+
+public class PersistableEvent {
+
+  private long eventId;
+  private byte[] payload;
+  private byte[] spill;
+  private List<HeaderEntry> headers;
+
+  public PersistableEvent(Event event) {
+
+    byte[] givenPayload = event.getBody();
+    if (givenPayload.length < ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD) {
+      payload = Arrays.copyOf(givenPayload, givenPayload.length);
+      spill = null;
+    } else {
+      payload = Arrays.copyOfRange(givenPayload, 0,
+          ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD);
+      spill = Arrays.copyOfRange(givenPayload,
+          ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD, givenPayload.length);
+    }
+
+    Map<String, String> headerMap = event.getHeaders();
+    if (headerMap != null && headerMap.size() > 0) {
+      headers = new ArrayList<HeaderEntry>();
+      for (Map.Entry<String, String> entry : headerMap.entrySet()) {
+        String name = entry.getKey();
+        String value = entry.getValue();
+        headers.add(new HeaderEntry(name, value));
+      }
+    }
+  }
+
+  public byte[] getPayload() {
+    byte[] result = null;
+    if (spill == null) {
+      result = Arrays.copyOf(payload, payload.length);
+    } else {
+      result = new byte[payload.length + spill.length];
+      System.arraycopy(payload, 0, result, 0, payload.length);
+      System.arraycopy(spill, 0, result, payload.length, spill.length);
+    }
+
+    return result;
+  }
+
+  public Map<String, String> getHeaders() {
+    Map<String, String> headerMap = null;
+    if (headers != null) {
+      headerMap =  new HashMap<String, String>();
+      for (HeaderEntry entry :  headers) {
+        headerMap.put(entry.getName(), entry.getValue());
+      }
+    }
+
+    return headerMap;
+  }
+
+  public static class HeaderEntry {
+
+    private SpillableString nameString;
+    private SpillableString valueString;
+
+    public HeaderEntry(String name, String value) {
+      nameString = new SpillableString(name,
+          ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD);
+      valueString = new SpillableString(value,
+          ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD);
+    }
+
+    public String getName() {
+      return nameString.getString();
+    }
+
+    public String getValue() {
+      return valueString.getString();
+    }
+  }
+
+  private static class SpillableString {
+
+    private String base;
+    private String spill;
+
+    public SpillableString(String string, int threshold) {
+      if (string.getBytes().length < threshold) {
+        base = string;
+      } else {
+        // Identify the maximum character size that will fit in the
+        // given threshold
+        int currentIndex = threshold / 3; // Assuming 3 byte encoding worst case
+        int lastIndex = currentIndex;
+        while (true) {
+          int length = string.substring(0, currentIndex).getBytes().length;
+          if (length < threshold) {
+            lastIndex = currentIndex;
+            currentIndex++;
+          } else {
+            break;
+          }
+        }
+        base = string.substring(0, lastIndex);
+        spill = string.substring(lastIndex);
+      }
+    }
+
+    public String getBase() {
+      return base;
+    }
+
+    public String getSpill() {
+      return spill;
+    }
+
+    public String getString() {
+      if (spill == null) {
+        return base;
+      }
+      return base + spill;
+    }
+  }
+}

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java Thu Oct 13 08:00:59 2011
@@ -28,18 +28,32 @@ public class TestDerbySchemaHandlerQueri
 
   public static final String EXPECTED_QUERY_CREATE_TABLE_FL_EVENT
        = "CREATE TABLE FLUME.FL_EVENT ( FLE_ID BIGINT GENERATED ALWAYS AS "
-           + "IDENTITY (START WITH 2, INCREMENT BY 1) PRIMARY KEY, FLE_PAYLOAD "
-           + "VARCHAR(16384) FOR BIT DATA, FLE_CHANNEL VARCHAR(32))";
-
-  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_PLEXT
-       = "CREATE TABLE FLUME.FL_PLEXT ( FLP_EVENTID BIGINT, FLP_SPILL BLOB, "
-           + "FOREIGN KEY (FLP_EVENTID) REFERENCES FLUME.FL_EVENT (FLE_ID))";
+           + "IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, FLE_PAYLOAD "
+           + "VARCHAR(16384) FOR BIT DATA, FLE_CHANNEL VARCHAR(64), "
+           + "FLE_SPILL BOOLEAN)";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL
+       = "CREATE TABLE FLUME.FL_PLSPILL ( FLP_EVENT BIGINT, FLP_SPILL BLOB, "
+           + "FOREIGN KEY (FLP_EVENT) REFERENCES FLUME.FL_EVENT (FLE_ID))";
 
   public static final String EXPECTED_QUERY_CREATE_TABLE_FL_HEADER
-       = "CREATE TABLE FLUME.FL_HEADER ( FLH_EVENTID BIGINT, FLH_NAME "
-           + "VARCHAR(255), FLH_VALUE VARCHAR(255), FOREIGN KEY (FLH_EVENTID) "
+       = "CREATE TABLE FLUME.FL_HEADER ( FLH_ID BIGINT GENERATED ALWAYS AS "
+           + "IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+           + "FLH_EVENT BIGINT, FLH_NAME VARCHAR(251), "
+           + "FLH_VALUE VARCHAR(251), FLH_NMSPILL BOOLEAN, "
+           + "FLH_VLSPILL BOOLEAN, FOREIGN KEY (FLH_EVENT) "
            + "REFERENCES FLUME.FL_EVENT (FLE_ID))";
 
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL
+       = "CREATE TABLE FLUME.FL_NMSPILL ( FLN_HEADER BIGINT, FLN_SPILL "
+           + "VARCHAR(32517), FOREIGN KEY (FLN_HEADER) REFERENCES "
+           + "FLUME.FL_HEADER (FLH_ID))";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL
+       = "CREATE TABLE FLUME.FL_VLSPILL ( FLV_HEADER BIGINT, FLV_SPILL "
+            + "VARCHAR(32517), FOREIGN KEY (FLV_HEADER) REFERENCES "
+            + "FLUME.FL_HEADER (FLH_ID))";
+
   @Test
   public void testCreateQueries() {
 
@@ -49,12 +63,18 @@ public class TestDerbySchemaHandlerQueri
     Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_EVENT,
         EXPECTED_QUERY_CREATE_TABLE_FL_EVENT);
 
-    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_PLEXT,
-        EXPECTED_QUERY_CREATE_TABLE_FL_PLEXT);
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_PLSPILL,
+        EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL);
 
     Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_HEADER,
         EXPECTED_QUERY_CREATE_TABLE_FL_HEADER);
 
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_NMSPILL,
+        EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_VLSPILL,
+        EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL);
+
   }
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Thu Oct 13 08:00:59 2011
@@ -4,8 +4,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
 
+import junit.framework.Assert;
+
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -18,6 +21,7 @@ public class TestJdbcChannelProvider {
 
   private Properties derbyProps = new Properties();
   private File derbyDbDir;
+  private JdbcChannelProviderImpl provider;
 
   @Before
   public void setUp() throws IOException {
@@ -52,16 +56,39 @@ public class TestJdbcChannelProvider {
 
   @Test
   public void testDerbySetup() {
-    JdbcChannelProviderImpl jdbcProviderImpl =
-        new JdbcChannelProviderImpl();
+    provider = new JdbcChannelProviderImpl();
+
+    provider.initialize(derbyProps);
+
+    Transaction tx1 = provider.getTransaction();
+    tx1.begin();
+
+    Transaction tx2 = provider.getTransaction();
+
+    Assert.assertSame(tx1, tx2);
+    tx2.begin();
+    tx2.close();
+    tx1.close();
 
-    jdbcProviderImpl.initialize(derbyProps);
+    Transaction tx3 = provider.getTransaction();
+    Assert.assertNotSame(tx1, tx3);
 
-    Transaction tx = jdbcProviderImpl.getTransaction();
-    tx.begin();
-    tx.begin();
-    tx.close();
-    tx.close();
-    jdbcProviderImpl.close();
+    tx3.begin();
+    tx3.close();
+
+    provider.close();
+    provider = null;
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (provider != null) {
+      try {
+        provider.close();
+      } catch (Exception ex) {
+        LOGGER.error("Unable to close provider", ex);
+      }
+    }
+    provider = null;
   }
 }

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java?rev=1182709&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java Thu Oct 13 08:00:59 2011
@@ -0,0 +1,166 @@
+package org.apache.flume.channel.jdbc;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.flume.Event;
+import org.apache.flume.channel.jdbc.impl.PersistableEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestPersistentEvent {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TestPersistentEvent.class);
+
+  private static final String[] CHARS = new String[] {
+    "a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r",
+    "s","t","u","v","w","x","y","z",
+    "A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R",
+    "S","T","U","V","W","X","Y","Z",
+    "0","1","2","3","4","5","6","7","8","9",
+    "!","@","#","$","%","^","&","*","(",")",
+    "[","]","{","}",":",";","\"","'",",",".","<",">","?","/","\\","|",
+  };
+
+  @Test
+  public void testMarshalling() {
+    Random rnd = new Random(System.currentTimeMillis());
+
+    int nameLimit = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
+    int valLimit = ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD;
+
+    byte[] s1 = new byte[1];
+    rnd.nextBytes(s1);
+    runTest(s1, null);
+
+    byte[] s2 = new byte[2];
+    rnd.nextBytes(s2);
+    runTest(s2, new HashMap<String, String>());
+
+    byte[] s3 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD - 2];
+    rnd.nextBytes(s3);
+    Map<String, String> m3 = new HashMap<String, String>();
+    m3.put(generateString(rnd, 1), generateString(rnd, 1));
+    runTest(s3, m3);
+
+    byte[] s4 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD - 1];
+    rnd.nextBytes(s4);
+    Map<String, String> m4 = new HashMap<String, String>();
+    m4.put(generateString(rnd, nameLimit - 21), "w");
+    m4.put(generateString(rnd, nameLimit - 2), "x");
+    m4.put(generateString(rnd, nameLimit - 1), "y");
+    m4.put(generateString(rnd, nameLimit), "z");
+    m4.put(generateString(rnd, nameLimit + 1), "a");
+    m4.put(generateString(rnd, nameLimit + 2), "b");
+    m4.put(generateString(rnd, nameLimit + 21), "c");
+    runTest(s4, m4);
+
+    byte[] s5 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD];
+    rnd.nextBytes(s5);
+    Map<String, String> m5 = new HashMap<String, String>();
+    m5.put("w", generateString(rnd, valLimit - 21));
+    m5.put("x", generateString(rnd, valLimit - 2));
+    m5.put("y", generateString(rnd, valLimit - 1));
+    m5.put("z", generateString(rnd, valLimit));
+    m5.put("a", generateString(rnd, valLimit + 1));
+    m5.put("b", generateString(rnd, valLimit + 2));
+    m5.put("c", generateString(rnd, valLimit + 21));
+    runTest(s5, m5);
+
+    byte[] s6 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 1];
+    rnd.nextBytes(s6);
+    Map<String, String> m6 = new HashMap<String, String>();
+    m6.put(generateString(rnd, nameLimit - 21),
+           generateString(rnd, valLimit - 21));
+    m6.put(generateString(rnd, nameLimit - 2),
+        generateString(rnd, valLimit - 2));
+    m6.put(generateString(rnd, nameLimit - 1),
+        generateString(rnd, valLimit - 1));
+    m6.put(generateString(rnd, nameLimit),
+        generateString(rnd, valLimit));
+    m6.put(generateString(rnd, nameLimit + 1),
+        generateString(rnd, valLimit + 1));
+    m6.put(generateString(rnd, nameLimit + 2),
+        generateString(rnd, valLimit + 2));
+    m6.put(generateString(rnd, nameLimit + 21),
+        generateString(rnd, valLimit + 21));
+
+    runTest(s6, m6);
+
+    byte[] s7 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 2];
+    rnd.nextBytes(s7);
+    runTest(s7, null);
+
+    byte[] s8 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 27];
+    rnd.nextBytes(s8);
+    runTest(s8, null);
+  }
+
+  private String generateString(Random rnd, int size) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < size; i++) {
+      int x = Math.abs(rnd.nextInt());
+      int y = x % CHARS.length;
+      sb.append(CHARS[y]);
+    }
+    System.out.println("String: " + sb);
+    return sb.toString();
+  }
+
+
+
+  private void runTest(byte[] payload, Map<String, String> headers) {
+    PersistableEvent pe = new PersistableEvent(new MockEvent(payload, headers));
+    Assert.assertArrayEquals(payload, pe.getPayload());
+    Map<String, String> h = pe.getHeaders();
+    if (h == null) {
+      Assert.assertTrue(headers == null || headers.size() == 0);
+    } else {
+      Assert.assertTrue(headers.size() == h.size());
+      for (String key : h.keySet()) {
+        Assert.assertTrue(headers.containsKey(key));
+        String value = h.get(key);
+        String expectedValue = headers.remove(key);
+        Assert.assertEquals(expectedValue, value);
+      }
+      Assert.assertTrue(headers.size() == 0);
+    }
+  }
+
+
+
+  private static class MockEvent implements Event {
+
+    private final byte[] payload;
+    private final Map<String, String> headers;
+
+    private MockEvent(byte[] payload, Map<String, String> headers) {
+      this.payload = payload;
+      this.headers = headers;
+    }
+
+    @Override
+    public Map<String, String> getHeaders() {
+      return headers;
+    }
+
+    @Override
+    public void setHeaders(Map<String, String> headers) {
+
+    }
+
+    @Override
+    public byte[] getBody() {
+      return payload;
+    }
+
+    @Override
+    public void setBody(byte[] body) {
+
+    }
+
+  }
+}