You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2023/03/21 00:24:38 UTC

[flume-jdbc] branch main created (now 83686b9)

This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flume-jdbc.git


      at 83686b9  FLUME-3458 - Move JDBC Channel to its own repo

This branch includes the following new commits:

     new 83686b9  FLUME-3458 - Move JDBC Channel to its own repo

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flume-jdbc] 01/01: FLUME-3458 - Move JDBC Channel to its own repo

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flume-jdbc.git

commit 83686b9fe010cd7c24fd01497e132bf51334e509
Author: Ralph Goers <rg...@apache.org>
AuthorDate: Mon Mar 20 17:24:25 2023 -0700

    FLUME-3458 - Move JDBC Channel to its own repo
---
 .asf.yaml                                          |   40 +
 CHANGELOG                                          |    7 +
 LICENSE.txt                                        |  245 +++++
 NOTICE.txt                                         |    5 +
 README.md                                          |   58 ++
 RELEASE-NOTES.txt                                  |   26 +
 checkstyle-header.txt                              |   16 +
 findbugs-exclude-filter.xml                        |   31 +
 flume-jdbc-channel/pom.xml                         |   81 ++
 .../flume/channel/jdbc/ConfigurationConstants.java |  173 ++++
 .../apache/flume/channel/jdbc/DatabaseType.java    |   69 ++
 .../org/apache/flume/channel/jdbc/JdbcChannel.java |   93 ++
 .../flume/channel/jdbc/JdbcChannelException.java   |   38 +
 .../flume/channel/jdbc/JdbcChannelProvider.java    |   63 ++
 .../channel/jdbc/JdbcChannelProviderFactory.java   |   62 ++
 .../flume/channel/jdbc/TransactionIsolation.java   |   53 +
 .../channel/jdbc/impl/DerbySchemaHandler.java      | 1091 ++++++++++++++++++++
 .../channel/jdbc/impl/JdbcChannelProviderImpl.java |  618 +++++++++++
 .../channel/jdbc/impl/JdbcTransactionFactory.java  |   38 +
 .../channel/jdbc/impl/JdbcTransactionImpl.java     |  190 ++++
 .../channel/jdbc/impl/MySQLSchemaHandler.java      |   68 ++
 .../flume/channel/jdbc/impl/PersistableEvent.java  |  369 +++++++
 .../flume/channel/jdbc/impl/SchemaHandler.java     |   77 ++
 .../channel/jdbc/impl/SchemaHandlerFactory.java    |   49 +
 .../channel/jdbc/BaseJdbcChannelProviderTest.java  |  383 +++++++
 .../org/apache/flume/channel/jdbc/MockEvent.java   |   60 ++
 .../apache/flume/channel/jdbc/MockEventUtils.java  |  119 +++
 .../flume/channel/jdbc/TestDatabaseTypeEnum.java   |   80 ++
 .../jdbc/TestDerbySchemaHandlerQueries.java        |  252 +++++
 .../channel/jdbc/TestJdbcChannelProvider.java      |   28 +
 .../channel/jdbc/TestJdbcChannelProviderNoFK.java  |   29 +
 .../flume/channel/jdbc/TestPersistentEvent.java    |  116 +++
 .../jdbc/TestTransactionIsolationLevelEnum.java    |   69 ++
 flume-jdbc-dist/pom.xml                            |  138 +++
 flume-jdbc-dist/src/assembly/bin.xml               |   50 +
 flume-jdbc-dist/src/assembly/src.xml               |   45 +
 pom.xml                                            |  275 +++++
 37 files changed, 5204 insertions(+)

diff --git a/.asf.yaml b/.asf.yaml
new file mode 100644
index 0000000..75e4726
--- /dev/null
+++ b/.asf.yaml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# `.asf.yaml` is a branch-specific YAML configuration file for Git repositories to control features such as notifications, GitHub settings, etc.
+# See its documentation for details: https://cwiki.apache.org/confluence/display/INFRA/Git+-+.asf.yaml+features
+
+notifications:
+  # GitHub already provides notifications for PRs and issues.
+  # Please don't duplicate that noise here!
+  commits: commits@flume.apache.org
+  jira_options: link label
+github:
+  description: "Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of event data"
+  homepage: https://flume.apache.org/
+  features:
+    issues: true
+  del_branch_on_merge: true
+  autolink_jira:
+    - FLUME
+  labels:
+    - apache
+    - api
+    - java
+    - jvm
+    - library
+    - flume
+  protected_branches:
+    main: {}
diff --git a/CHANGELOG b/CHANGELOG
new file mode 100644
index 0000000..a1dec55
--- /dev/null
+++ b/CHANGELOG
@@ -0,0 +1,7 @@
+Release Notes - Flume JDBC - Version 2.0.0
+
+** Improvement
+    * [FLUME-3458] - Move the JDBC Channel to its own repo.
+
+
+
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..9fa7156
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,245 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+====
+
+The following files are included under the 2-Clause BSD License
+
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_ar.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_bg.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_da.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_de.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_es.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_fa.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_fi.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_fr.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_hi.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_hu.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_it.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_nl.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_no.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_pt.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_ro.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_ru.txt
+flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_sv.txt
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+this list of conditions and the following disclaimer in the documentation and/or
+other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/NOTICE.txt b/NOTICE.txt
new file mode 100644
index 0000000..a2c2ff9
--- /dev/null
+++ b/NOTICE.txt
@@ -0,0 +1,5 @@
+Apache Flume Spring Boot
+Copyright 2022-2023 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..10df09c
--- /dev/null
+++ b/README.md
@@ -0,0 +1,58 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Welcome to Apache Flume JDBC!
+
+Apache Flume is a distributed, reliable, and available service for efficiently
+collecting, aggregating, and moving large amounts of event data. It has a simple
+and flexible architecture based on streaming data flows. It is robust and fault
+tolerant with tunable reliability mechanisms and many failover and recovery
+mechanisms. The system is centrally managed and allows for intelligent dynamic
+management. It uses a simple extensible data model that allows for online
+analytic application.
+
+The Apache Flume JDBC module provides a channel to store events temporarily in a database.
+
+Apache Flume JDBC is open-sourced under the Apache Software Foundation License v2.0.
+
+## Documentation
+
+Documentation is included in the binary distribution under the docs directory.
+In source form, it can be found in the flume-ng-doc directory.
+
+The Flume 1.x guide and FAQ are available here:
+
+* https://cwiki.apache.org/FLUME
+* https://cwiki.apache.org/confluence/display/FLUME/Getting+Started
+
+## Contact us!
+
+* Mailing lists: https://cwiki.apache.org/confluence/display/FLUME/Mailing+Lists
+* Slack channel #flume on https://the-asf.slack.com/
+
+Bug and Issue tracker.
+
+* https://github.com/apache/flume-jdbc/issues
+
+## Compiling Flume JDBC
+
+Compiling Flume JDBC requires the following tools:
+
+* Oracle Java JDK 8
+* Apache Maven 3.x
diff --git a/RELEASE-NOTES.txt b/RELEASE-NOTES.txt
new file mode 100644
index 0000000..ef72912
--- /dev/null
+++ b/RELEASE-NOTES.txt
@@ -0,0 +1,26 @@
+Apache Flume JDBC 2.0.0
+
+CONTENTS
+1. What is Apache Flume JDBC
+2. Major changes in this Release
+3. How to Get Involved
+4. How to Report Issues
+
+1. What is Apache Flume JDBC
+Flume is a distributed, reliable, and available service for
+efficiently collecting, aggregating, and moving large amounts of event
+data. Flume JDBC allows Flume to store data temporarily in a relational database.
+
+2. Major changes in this Release
+For a detailed list of changes, please see the CHANGELOG file included
+in this distribution.
+
+4. How to Get Involved
+The Apache Flume project really needs and appreciates any contributions,
+including documentation help, source code and feedback. If you are interested
+in contributing, please visit:
+https://cwiki.apache.org/confluence/display/FLUME/How+to+Contribute
+
+5. How to Report Issues
+The Apache Flume Spring Boot project uses GitHub issues for issue tracking. Please see
+https://github.com/apache/flume-jdbc/issues
diff --git a/checkstyle-header.txt b/checkstyle-header.txt
new file mode 100644
index 0000000..4f33236
--- /dev/null
+++ b/checkstyle-header.txt
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
diff --git a/findbugs-exclude-filter.xml b/findbugs-exclude-filter.xml
new file mode 100644
index 0000000..327be31
--- /dev/null
+++ b/findbugs-exclude-filter.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="iso-8859-1"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!-- ===================================================================== -->
+<!-- $Id: findbugs-exclude-filter.xml 773234 2009-05-09 15:27:59Z rgoers $ -->
+<!-- ===================================================================== -->
+<FindBugsFilter>
+  <!-- Enable only high priority warnings -->
+  <Match>
+    <Priority value="2"/>
+  </Match>
+
+  <Match>
+    <Priority value="3"/>
+  </Match>
+</FindBugsFilter>
diff --git a/flume-jdbc-channel/pom.xml b/flume-jdbc-channel/pom.xml
new file mode 100644
index 0000000..79ed9f8
--- /dev/null
+++ b/flume-jdbc-channel/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-jdbc-parent</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume</groupId>
+  <artifactId>flume-jdbc-channel</artifactId>
+  <name>Flume NG JDBC channel</name>
+
+  <properties>
+    <!-- TODO fix spotbugs/pmd violations -->
+    <spotbugs.maxAllowedViolations>31</spotbugs.maxAllowedViolations>
+    <pmd.maxAllowedViolations>16</pmd.maxAllowedViolations>
+    <module.name>org.apache.flume.channel.jdbc</module.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-dbcp</groupId>
+      <artifactId>commons-dbcp</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-1.2-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
new file mode 100644
index 0000000..8c33d69
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+/**
+ * Contains configuration keys used by the JDBC channel implementation.
+ */
+public final class ConfigurationConstants {
+
+  public static final String PREFIX = "org.apache.flume.channel.jdbc.";
+
+  public static final String CONFIG_JDBC_SYSPROP_PREFIX = "sysprop.";
+
+  /**
+   * @deprecated use {@link #CONFIG_JDBC_SYSPROP_PREFIX} instead
+   */
+  public static final String OLD_CONFIG_JDBC_SYSPROP_PREFIX =
+      PREFIX + CONFIG_JDBC_SYSPROP_PREFIX;
+
+  public static final String CONFIG_JDBC_DRIVER_CLASS = "driver.class";
+
+  /**
+   * @deprecated use {@link #CONFIG_JDBC_DRIVER_CLASS} instead.
+   */
+  public static final String OLD_CONFIG_JDBC_DRIVER_CLASS =
+      PREFIX + CONFIG_JDBC_DRIVER_CLASS;
+
+  public static final String CONFIG_USERNAME = "db.username";
+
+  /**
+   * @deprecated use {@link #CONFIG_USERNAME} instead.
+   */
+  public static final String OLD_CONFIG_USERNAME =
+      PREFIX + CONFIG_USERNAME;
+
+  public static final String CONFIG_PASSWORD = "db.password";
+
+  /**
+   * @deprecated use {@link #CONFIG_PASSWORD} instead.
+   */
+  public static final String OLD_CONFIG_PASSWORD =
+      PREFIX + CONFIG_PASSWORD;
+
+  public static final String CONFIG_URL = "driver.url";
+
+  /**
+   * @deprecated use {@link #CONFIG_URL} instead.
+   */
+  public static final String OLD_CONFIG_URL =
+      PREFIX + CONFIG_URL;
+
+  public static final String CONFIG_JDBC_PROPS_FILE =
+      "connection.properties.file";
+
+  /**
+   * @deprecated use {@link #CONFIG_JDBC_PROPS_FILE} instead.
+   */
+  public static final String OLD_CONFIG_JDBC_PROPS_FILE =
+      PREFIX + CONFIG_JDBC_PROPS_FILE;
+
+  public static final String CONFIG_DATABASE_TYPE = "db.type";
+
+  /**
+   * @deprecated use {@link #CONFIG_DATABASE_TYPE} instead.
+   */
+  public static final String OLD_CONFIG_DATABASE_TYPE =
+      PREFIX + CONFIG_DATABASE_TYPE;
+
+  public static final String CONFIG_CREATE_SCHEMA = "create.schema";
+
+  /**
+   * @deprecated use {@link #CONFIG_CREATE_SCHEMA} instead.
+   */
+  public static final String OLD_CONFIG_CREATE_SCHEMA =
+      PREFIX + CONFIG_CREATE_SCHEMA;
+
+  public static final String CONFIG_CREATE_INDEX = "create.index";
+
+  /**
+   * @deprecated use {@link #CONFIG_CREATE_INDEX} instead.
+   */
+  public static final String OLD_CONFIG_CREATE_INDEX =
+      PREFIX + CONFIG_CREATE_INDEX;
+
+  public static final String CONFIG_CREATE_FK = "create.foreignkey";
+
+  /**
+   * @deprecated use {@link #CONFIG_CREATE_FK} instead.
+   */
+  public static final String OLD_CONFIG_CREATE_FK =
+      PREFIX + CONFIG_CREATE_FK;
+
+  public static final String CONFIG_TX_ISOLATION_LEVEL =
+      "transaction.isolation";
+
+  /**
+   * @deprecated use {@link #CONFIG_TX_ISOLATION_LEVEL} instead.
+   */
+  public static final String OLD_CONFIG_TX_ISOLATION_LEVEL =
+      PREFIX + CONFIG_TX_ISOLATION_LEVEL;
+
+  public static final String CONFIG_MAX_CONNECTIONS = "maximum.connections";
+
+  /**
+   * @deprecated use {@link #CONFIG_MAX_CONNECTIONS} instead
+   */
+  public static final String OLD_CONFIG_MAX_CONNECTIONS =
+      PREFIX + CONFIG_MAX_CONNECTIONS;
+
+  public static final String CONFIG_MAX_CAPACITY = "maximum.capacity";
+
+  /**
+   * @deprecated use {@link #CONFIG_MAX_CAPACITY} instead.
+   */
+  public static final String OLD_CONFIG_MAX_CAPACITY =
+      PREFIX + CONFIG_MAX_CAPACITY;
+
+  // Built in constants for JDBC Channel implementation
+
+  /**
+   * The length for payload bytes that will be stored inline. Payloads larger
+   * than this length will spill into BLOB.
+   */
+  public static int PAYLOAD_LENGTH_THRESHOLD = 16384; // 16kb
+
+  /**
+   * The length of header name in bytes that will be stored inline. Header
+   * names longer than this number will spill over into CLOB.
+   */
+  public static int HEADER_NAME_LENGTH_THRESHOLD = 251;
+
+  /**
+   * The length of header value in bytes that will be stored inline. Header
+   * values longer than this number will spill over into CLOB.
+   */
+  public static int HEADER_VALUE_LENGTH_THRESHOLD = 251;
+
+  /**
+   * The maximum length of channel name.
+   */
+  public static int CHANNEL_NAME_MAX_LENGTH = 64;
+
+  /**
+   * The maximum spill size for header names. Together with the value of
+   * HEADER_NAME_LENGTH_THRESHOLD this adds up to 32kb.
+   */
+  public static int HEADER_NAME_SPILL_MAX_LENGTH = 32517;
+
+  /**
+   * The maximum spill size for header values. Together with the value of
+   * HEADER_VALUE_LENGTH_THRESHOLD, this adds up to 32kb.
+   */
+  public static int HEADER_VALUE_SPILL_MAX_LENGTH = 32517;
+
+  private ConfigurationConstants() {
+    // Disable object creation
+  }
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/DatabaseType.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/DatabaseType.java
new file mode 100644
index 0000000..183a731
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/DatabaseType.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.Locale;
+
+public enum DatabaseType {
+  /** All other databases */
+  OTHER("OTHER", null),
+
+  /** Apache Derby */
+  DERBY("DERBY", "values(1)"),
+
+  /** MySQL */
+  MYSQL("MYSQL", "select 1"),
+
+  /** PostgreSQL */
+  POSTGRESQL("POSTGRESQL", null),
+
+  /** Oracle */
+  ORACLE("ORACLE", null);
+
+  private final String name;
+  private final String validationQuery;
+
+  private DatabaseType(String name, String validationQuery) {
+    this.name = name;
+    this.validationQuery = validationQuery;
+  }
+
+  public String toString() {
+    return getName();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getValidationQuery() {
+    return validationQuery;
+  }
+
+  public static DatabaseType getByName(String dbName) {
+    DatabaseType type = null;
+    try {
+      type = DatabaseType.valueOf(dbName.trim().toUpperCase(Locale.ENGLISH));
+    } catch (IllegalArgumentException ex) {
+      type = DatabaseType.OTHER;
+    }
+
+    return type;
+  }
+
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
new file mode 100644
index 0000000..fba6e7b
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.annotations.Disposable;
+import org.apache.flume.channel.AbstractChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * A JDBC based channel implementation.
+ * </p>
+ * <p>
+ * JdbcChannel is marked
+ * {@link org.apache.flume.annotations.InterfaceAudience.Private} because it
+ * should only be instantiated via a configuration. For example, users should
+ * certainly use JdbcChannel but not by instantiating JdbcChannel objects.
+ * Meaning the label Private applies to user-developers not user-operators.
+ * In cases where a Channel is required by instantiated by user-developers
+ * {@link org.apache.flume.channel.MemoryChannel} should be used.
+ * <p>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+@Disposable
+public class JdbcChannel extends AbstractChannel {
+
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcChannel.class);
+
+  private JdbcChannelProvider provider;
+
+  /**
+   * Instantiates a new JDBC Channel.
+   */
+  public JdbcChannel() {
+  }
+
+  @Override
+  public void put(Event event) throws ChannelException {
+    getProvider().persistEvent(getName(), event);
+  }
+
+  @Override
+  public Event take() throws ChannelException {
+    return getProvider().removeEvent(getName());
+  }
+
+  @Override
+  public Transaction getTransaction() {
+    return getProvider().getTransaction();
+  }
+
+  @Override
+  public void stop() {
+    JdbcChannelProviderFactory.releaseProvider(getName());
+    provider = null;
+
+    super.stop();
+  }
+
+  private JdbcChannelProvider getProvider() {
+    return provider;
+  }
+
+  @Override
+  public void configure(Context context) {
+    provider = JdbcChannelProviderFactory.getProvider(context, getName());
+
+    LOG.info("JDBC Channel initialized: " + getName());
+  }
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelException.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelException.java
new file mode 100644
index 0000000..17ebeed
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import org.apache.flume.ChannelException;
+
+public class JdbcChannelException extends ChannelException {
+
+  private static final long serialVersionUID = -5566109526732929679L;
+
+  public JdbcChannelException(String message) {
+    super(message);
+  }
+
+  public JdbcChannelException(String message, Exception cause) {
+    super(message, cause);
+  }
+
+  public JdbcChannelException(Exception cause) {
+    super(cause);
+  }
+
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
new file mode 100644
index 0000000..76bc627
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+
+/**
+ * Service provider interface for JDBC channel providers.
+ */
+public interface JdbcChannelProvider {
+
+  /**
+   * Initializes the channel provider. This method must be called before
+   * the channel can be used in any way.
+   * @param context the configuration for the system
+   */
+  public void initialize(Context context);
+
+  /**
+   * Deinitializes the channel provider. Once this method is called, the
+   * channel provider cannot be used and must be discarded.
+   */
+  public void close();
+
+  /**
+   * Writes the event to the persistent store.
+   * @param channelName
+   * @param event
+   */
+  public void persistEvent(String channelName, Event event);
+
+
+  /**
+   * Removes the next event for the named channel from the underlying
+   * persistent store.
+   * @param channelName
+   * @return
+   */
+  public Event removeEvent(String channelName);
+
+  /**
+   * @return the transaction associated with the current thread.
+   */
+  public Transaction getTransaction();
+
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java
new file mode 100644
index 0000000..6fbd6ef
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flume.Context;
+import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
+
+public final class JdbcChannelProviderFactory {
+
+  private static Set<String> INSTANCES = new HashSet<String>();
+  private static JdbcChannelProvider PROVIDER;
+
+  public static synchronized JdbcChannelProvider getProvider(
+      Context context, String name) {
+    if (PROVIDER == null) {
+      PROVIDER = new JdbcChannelProviderImpl();
+      PROVIDER.initialize(context);
+    }
+
+    if (!INSTANCES.add(name)) {
+      throw new JdbcChannelException("Attempt to initialize multiple "
+           + "channels with same name: " + name);
+    }
+
+    return PROVIDER;
+  }
+
+  public static synchronized void releaseProvider(String name) {
+    if (!INSTANCES.remove(name)) {
+      throw new JdbcChannelException("Attempt to release non-existant channel: "
+          + name);
+    }
+
+    if (INSTANCES.size() == 0) {
+      // Deinitialize the provider
+      PROVIDER.close();
+      PROVIDER = null;
+    }
+  }
+
+  private JdbcChannelProviderFactory() {
+    // disable explicit object creation
+  }
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/TransactionIsolation.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/TransactionIsolation.java
new file mode 100644
index 0000000..5cc2489
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/TransactionIsolation.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.sql.Connection;
+import java.util.Locale;
+
+public enum TransactionIsolation {
+
+  READ_UNCOMMITTED("READ_UNCOMMITTED", Connection.TRANSACTION_READ_UNCOMMITTED),
+  READ_COMMITTED("READ_COMMITTED", Connection.TRANSACTION_READ_COMMITTED),
+  REPEATABLE_READ("REPEATABLE_READ", Connection.TRANSACTION_REPEATABLE_READ),
+  SERIALIZABLE("SERIALIZABLE", Connection.TRANSACTION_SERIALIZABLE);
+
+  private final String name;
+  private final int code;
+
+  private TransactionIsolation(String name, int code) {
+    this.name = name;
+    this.code = code;
+  }
+
+  public int getCode() {
+    return code;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String toString() {
+    return getName();
+  }
+
+  public static TransactionIsolation getByName(String name) {
+    return valueOf(name.trim().toUpperCase(Locale.ENGLISH));
+  }
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
new file mode 100644
index 0000000..56eebfd
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
@@ -0,0 +1,1091 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import javax.sql.DataSource;
+
+import org.apache.flume.channel.jdbc.ConfigurationConstants;
+import org.apache.flume.channel.jdbc.JdbcChannelException;
+import org.apache.flume.channel.jdbc.impl.PersistableEvent.HeaderEntry;
+import org.apache.flume.channel.jdbc.impl.PersistableEvent.SpillableString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Schema handler for Derby Database. This handler uses the following schema:
+ * </p>
+ *
+ * <p><strong><tt>FL_EVENT</tt></strong>: The main event table. This table
+ * contains an auto-generated event ID along with the first 16kb of payload
+ * data. If the payload is larger than 16kb, a spill indicator flag is set and
+ * the remaining data is recorded in the <tt>FL_PLSPILL</tt> table.</p>
+ * <pre>
+ * +-------------------------------+
+ * | FL_EVENT                      |
+ * +-------------------------------+
+ * | FLE_ID     : BIGINT PK        | (auto-gen sequence)
+ * | FLE_PAYLOAD: VARBINARY(16384) | (16kb payload)
+ * | FLE_SPILL  : BOOLEAN          | (true if payload spills)
+ * | FLE_CHANNEL: VARCHAR(64)      |
+ * +-------------------------------+
+ * </pre>
+ *
+ * <p><strong><tt>FL_PLSPILL</tt></strong>: This table holds payloads in excess
+ * of 16kb and relates back to the <tt>FL_EVENT</tt> table using foreign key
+ * reference via <tt>FLP_EVENT</tt> column.</p>
+ * <pre>
+ * +---------------------+
+ * | FL_PLSPILL          |
+ * +---------------------+
+ * | FLP_EVENT  : BIGINT | (FK into FL_EVENT.FLE_ID)
+ * | FLP_SPILL  : BLOB   |
+ * +---------------------+
+ * </pre>
+ * <p><strong><tt>FL_HEADER</tt></strong>: The table that holds headers. This
+ * table contains name value pairs of headers less than or up to first 255
+ * bytes each. If a name is longer than 255 bytes, a spill indicator flag is
+ * set and the remaining bytes are recorded in <tt>FL_NMSPILL</tt> table.
+ * Similarly if the value is longer than 255 bytes, a spill indicator flag is
+ * set and the remaining bytes are recorded in <tt>FL_VLSPILL</tt> table. Each
+ * header record relates back to the <tt>FL_EVENT</tt> table using foreign key
+ * reference via <tt>FLH_EVENT</tt> column.</p>
+ * <pre>
+ * +--------------------------+
+ * | FL_HEADER                |
+ * +--------------------------+
+ * | FLH_ID     : BIGINT PK   | (auto-gen sequence)
+ * | FLH_EVENT  : BIGINT      | (FK into FL_EVENT.FLE_ID)
+ * | FLH_NAME   : VARCHAR(251)|
+ * | FLH_VALUE  : VARCHAR(251)|
+ * | FLH_NMSPILL: BOOLEAN     | (true if name spills)
+ * | FLH_VLSPILL: BOOLEAN     | (true if value spills)
+ * +--------------------------+
+ * </pre>
+ * <p><strong><tt>FL_NMSPILL</tt></strong>: The table that holds header names
+ * in excess of 255 bytes and relates back to the <tt>FL_HEADER</tt> table
+ * using foreign key reference via <tt>FLN_HEADER</tt> column.</p>
+ * <pre>
+ * +----------------------+
+ * | FL_NMSPILL           |
+ * +----------------------+
+ * | FLN_HEADER  : BIGINT | (FK into FL_HEADER.FLH_ID)
+ * | FLN_SPILL   : CLOB   |
+ * +----------------------+
+ * </pre>
+ * <p><strong><tt>FL_VLSPILL</tt></strong>: The table that holds header values
+ * in excess of 255 bytes and relates back to the <tt>FL_HEADER</tt> table
+ * using foreign key reference via <tt>FLV_HEADER</tt> column.</p>
+ * <pre>
+ * +----------------------+
+ * | FL_VLSPILL           |
+ * +----------------------+
+ * | FLV_HEADER  : BIGINT | (FK into FL_HEADER.FLH_ID)
+ * | FLV_SPILL   : CLOB   |
+ * +----------------------+
+ * </pre>
+ * </p>
+ * <p><strong>NOTE</strong>: The values that decide the spill boundary
+ * and storage length limits are defined in <tt>ConfigurationConstants</tt>
+ * class.</p>
+ * @see org.apache.flume.channel.jdbc.ConfigurationConstants
+ */
+public class DerbySchemaHandler implements SchemaHandler {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(DerbySchemaHandler.class);
+
+  private static final String QUREY_SYSCHEMA_FLUME
+      = "SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = 'FLUME'";
+
+  private static final String SCHEMA_FLUME = "FLUME";
+
+  private static final String TABLE_FL_EVENT_NAME = "FL_EVENT";
+  private static final String TABLE_FL_EVENT = SCHEMA_FLUME + "."
+      + TABLE_FL_EVENT_NAME;
+  private static final String COLUMN_FLE_ID = "FLE_ID";
+  private static final String COLUMN_FLE_PAYLOAD = "FLE_PAYLOAD";
+  private static final String COLUMN_FLE_CHANNEL = "FLE_CHANNEL";
+  private static final String COLUMN_FLE_SPILL = "FLE_SPILL";
+  private static final String INDEX_FLE_CHANNEL_NAME = "IDX_FLE_CHANNEL";
+  private static final String INDEX_FLE_CHANNEL = SCHEMA_FLUME + "."
+      + INDEX_FLE_CHANNEL_NAME;
+
+  private static final String TABLE_FL_PLSPILL_NAME = "FL_PLSPILL";
+  private static final String TABLE_FL_PLSPILL = SCHEMA_FLUME + "."
+      + TABLE_FL_PLSPILL_NAME;
+  private static final String COLUMN_FLP_EVENT = "FLP_EVENT";
+  private static final String COLUMN_FLP_SPILL = "FLP_SPILL";
+  private static final String INDEX_FLP_EVENT_NAME = "IDX_FLP_EVENT";
+  private static final String INDEX_FLP_EVENT = SCHEMA_FLUME + "."
+      + INDEX_FLP_EVENT_NAME;
+
+  private static final String TABLE_FL_HEADER_NAME = "FL_HEADER";
+  private static final String TABLE_FL_HEADER = SCHEMA_FLUME + "."
+      + TABLE_FL_HEADER_NAME;
+  private static final String COLUMN_FLH_ID = "FLH_ID";
+  private static final String COLUMN_FLH_EVENT = "FLH_EVENT";
+  private static final String COLUMN_FLH_NAME = "FLH_NAME";
+  private static final String COLUMN_FLH_VALUE = "FLH_VALUE";
+  private static final String COLUMN_FLH_NMSPILL = "FLH_NMSPILL";
+  private static final String COLUMN_FLH_VLSPILL = "FLH_VLSPILL";
+  private static final String INDEX_FLH_EVENT_NAME = "IDX_FLH_EVENT";
+  private static final String INDEX_FLH_EVENT = SCHEMA_FLUME + "."
+      + INDEX_FLH_EVENT_NAME;
+
+  private static final String TABLE_FL_NMSPILL_NAME = "FL_NMSPILL";
+  private static final String TABLE_FL_NMSPILL = SCHEMA_FLUME + "."
+      + TABLE_FL_NMSPILL_NAME;
+  private static final String COLUMN_FLN_HEADER = "FLN_HEADER";
+  private static final String COLUMN_FLN_SPILL = "FLN_SPILL";
+  private static final String INDEX_FLN_HEADER_NAME = "IDX_FLN_HEADER";
+  private static final String INDEX_FLN_HEADER = SCHEMA_FLUME + "."
+      + INDEX_FLN_HEADER_NAME;
+
+  private static final String TABLE_FL_VLSPILL_NAME = "FL_VLSPILL";
+  private static final String TABLE_FL_VLSPILL = SCHEMA_FLUME + "."
+      + TABLE_FL_VLSPILL_NAME;
+  private static final String COLUMN_FLV_HEADER = "FLV_HEADER";
+  private static final String COLUMN_FLV_SPILL = "FLV_SPILL";
+  private static final String INDEX_FLV_HEADER_NAME = "IDX_FLV_HEADER";
+  private static final String INDEX_FLV_HEADER = SCHEMA_FLUME + "."
+      + INDEX_FLV_HEADER_NAME;
+
+  public static final String QUERY_CREATE_SCHEMA_FLUME
+      = "CREATE SCHEMA " + SCHEMA_FLUME;
+
+  public static final String QUERY_CREATE_TABLE_FL_EVENT
+      = "CREATE TABLE " + TABLE_FL_EVENT + " ( "
+        + COLUMN_FLE_ID + " BIGINT GENERATED ALWAYS AS IDENTITY "
+        + "(START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+        + COLUMN_FLE_PAYLOAD + " VARCHAR("
+        + ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD
+        + ") FOR BIT DATA, "
+        + COLUMN_FLE_CHANNEL + " VARCHAR("
+        + ConfigurationConstants.CHANNEL_NAME_MAX_LENGTH + "), "
+        + COLUMN_FLE_SPILL + " BOOLEAN)";
+
+  public static final String QUERY_CREATE_INDEX_FLE_CHANNEL
+      = "CREATE INDEX " + INDEX_FLE_CHANNEL + " ON " + TABLE_FL_EVENT
+        + " (" + COLUMN_FLE_CHANNEL + ")";
+
+  public static final String QUERY_CREATE_TABLE_FL_PLSPILL_FMT
+      = "CREATE TABLE " + TABLE_FL_PLSPILL + " ( "
+        + COLUMN_FLP_EVENT + " BIGINT, "
+        + COLUMN_FLP_SPILL + " BLOB{0})";
+
+  public static final String QUERY_CREATE_TABLE_FL_PLSPILL_FK
+      = MessageFormat.format(QUERY_CREATE_TABLE_FL_PLSPILL_FMT,
+          ", FOREIGN KEY (" + COLUMN_FLP_EVENT + ") REFERENCES "
+              + TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + ")"
+          );
+
+  public static final String QUERY_CREATE_TABLE_FL_PLSPILL_NOFK
+      = MessageFormat.format(QUERY_CREATE_TABLE_FL_PLSPILL_FMT, "");
+
+  public static final String QUERY_CREATE_INDEX_FLP_EVENT
+      = "CREATE INDEX " + INDEX_FLP_EVENT + " ON " + TABLE_FL_PLSPILL
+        + " (" + COLUMN_FLP_EVENT + ")";
+
+  public static final String QUERY_CREATE_TABLE_FL_HEADER_FMT
+      = "CREATE TABLE " + TABLE_FL_HEADER + " ( "
+        + COLUMN_FLH_ID + " BIGINT GENERATED ALWAYS AS IDENTITY "
+        + "(START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+        + COLUMN_FLH_EVENT + " BIGINT, "
+        + COLUMN_FLH_NAME + " VARCHAR("
+        + ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD + "), "
+        + COLUMN_FLH_VALUE + " VARCHAR("
+        + ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD + "), "
+        + COLUMN_FLH_NMSPILL + " BOOLEAN, "
+        + COLUMN_FLH_VLSPILL + " BOOLEAN{0})";
+
+  public static final String QUERY_CREATE_TABLE_FL_HEADER_FK
+      = MessageFormat.format(QUERY_CREATE_TABLE_FL_HEADER_FMT,
+          ", FOREIGN KEY (" + COLUMN_FLH_EVENT + ") REFERENCES "
+              + TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + ")"
+        );
+
+  public static final String QUERY_CREATE_TABLE_FL_HEADER_NOFK
+      = MessageFormat.format(QUERY_CREATE_TABLE_FL_HEADER_FMT, "");
+
+  public static final String QUERY_CREATE_INDEX_FLH_EVENT
+      = "CREATE INDEX " + INDEX_FLH_EVENT + " ON " + TABLE_FL_HEADER
+         + " (" + COLUMN_FLH_EVENT + ")";
+
+  public static final String QUERY_CREATE_TABLE_FL_NMSPILL_FMT
+      = "CREATE TABLE " + TABLE_FL_NMSPILL + " ( "
+         + COLUMN_FLN_HEADER + " BIGINT, "
+         + COLUMN_FLN_SPILL + " VARCHAR("
+         + ConfigurationConstants.HEADER_NAME_SPILL_MAX_LENGTH + "){0})";
+
+  public static final String QUERY_CREATE_TABLE_FL_NMSPILL_FK
+      = MessageFormat.format(QUERY_CREATE_TABLE_FL_NMSPILL_FMT,
+          ", FOREIGN KEY (" + COLUMN_FLN_HEADER + ") REFERENCES "
+              + TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + ")"
+        );
+
+  public static final String QUERY_CREATE_TABLE_FL_NMSPILL_NOFK
+      = MessageFormat.format(QUERY_CREATE_TABLE_FL_NMSPILL_FMT, "");
+
+  public static final String QUERY_CREATE_INDEX_FLN_HEADER
+      = "CREATE INDEX " + INDEX_FLN_HEADER + " ON " + TABLE_FL_NMSPILL
+         + " (" + COLUMN_FLN_HEADER + ")";
+
+  public static final String QUERY_CREATE_TABLE_FL_VLSPILL_FMT
+      = "CREATE TABLE " + TABLE_FL_VLSPILL + " ( "
+        + COLUMN_FLV_HEADER + " BIGINT, "
+        + COLUMN_FLV_SPILL + " VARCHAR("
+        + ConfigurationConstants.HEADER_VALUE_SPILL_MAX_LENGTH + "){0})";
+
+  public static final String QUERY_CREATE_TABLE_FL_VLSPILL_FK
+      = MessageFormat.format(QUERY_CREATE_TABLE_FL_VLSPILL_FMT,
+            ", FOREIGN KEY (" + COLUMN_FLV_HEADER + ") REFERENCES "
+                + TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + ")"
+        );
+
+  public static final String QUERY_CREATE_TABLE_FL_VLSPILL_NOFK
+      = MessageFormat.format(QUERY_CREATE_TABLE_FL_VLSPILL_FMT, "");
+
+  public static final String QUERY_CREATE_INDEX_FLV_HEADER
+      = "CREATE INDEX " + INDEX_FLV_HEADER + " ON " + TABLE_FL_VLSPILL
+         + " (" + COLUMN_FLV_HEADER + ")";
+
+  public static final String COLUMN_LOOKUP_QUERY
+      = "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
+         + "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
+         + "SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE "
+         + "SCHEMANAME = ? ))";
+
+  public static final String QUERY_CHANNEL_SIZE
+      = "SELECT COUNT(*) FROM " + TABLE_FL_EVENT;
+
+  public static final String STMT_INSERT_EVENT_BASE
+      = "INSERT INTO " + TABLE_FL_EVENT + " ("
+          + COLUMN_FLE_PAYLOAD + ", " + COLUMN_FLE_CHANNEL + ", "
+          + COLUMN_FLE_SPILL + ") VALUES ( ?, ?, ?)";
+
+  public static final String STMT_INSERT_EVENT_SPILL
+      = "INSERT INTO " + TABLE_FL_PLSPILL + " ("
+          + COLUMN_FLP_EVENT + ", " + COLUMN_FLP_SPILL + ") VALUES ( ?, ?)";
+
+  public static final String STMT_INSERT_HEADER_BASE
+      = "INSERT INTO " + TABLE_FL_HEADER + " ("
+          + COLUMN_FLH_EVENT + ", " + COLUMN_FLH_NAME + ", " + COLUMN_FLH_VALUE
+          + ", " + COLUMN_FLH_NMSPILL + ", " + COLUMN_FLH_VLSPILL + ") VALUES "
+          + "( ?, ?, ?, ?, ?)";
+
+  public static final String STMT_INSERT_HEADER_NAME_SPILL
+      = "INSERT INTO " + TABLE_FL_NMSPILL + " ("
+          + COLUMN_FLN_HEADER + ", " + COLUMN_FLN_SPILL + ") VALUES ( ?, ?)";
+
+  public static final String STMT_INSERT_HEADER_VALUE_SPILL
+      = "INSERT INTO " + TABLE_FL_VLSPILL + " ("
+          + COLUMN_FLV_HEADER + ", " + COLUMN_FLV_SPILL + ") VALUES ( ?, ?)";
+
+  public static final String STMT_FETCH_PAYLOAD_BASE
+      = "SELECT " + COLUMN_FLE_ID + ", " + COLUMN_FLE_PAYLOAD + ", "
+          + COLUMN_FLE_SPILL + " FROM " + TABLE_FL_EVENT + " WHERE "
+          + COLUMN_FLE_ID + " = (SELECT MIN(" + COLUMN_FLE_ID + ") FROM "
+          + TABLE_FL_EVENT + " WHERE " + COLUMN_FLE_CHANNEL + " = ?)";
+
+  public static final String STMT_FETCH_PAYLOAD_SPILL
+      = "SELECT " + COLUMN_FLP_SPILL + " FROM " + TABLE_FL_PLSPILL + " WHERE "
+          + COLUMN_FLP_EVENT + " = ?";
+
+  public static final String STMT_FETCH_HEADER_BASE
+      = "SELECT " + COLUMN_FLH_ID + ", " + COLUMN_FLH_NAME + ", "
+          + COLUMN_FLH_VALUE + ", " + COLUMN_FLH_NMSPILL + ", "
+          + COLUMN_FLH_VLSPILL + " FROM " + TABLE_FL_HEADER + " WHERE "
+          + COLUMN_FLH_EVENT + " = ?";
+
+  public static final String STMT_FETCH_HEADER_NAME_SPILL
+      = "SELECT " + COLUMN_FLN_SPILL + " FROM " + TABLE_FL_NMSPILL
+          + " WHERE " + COLUMN_FLN_HEADER + " = ?";
+
+  public static final String STMT_FETCH_HEADER_VALUE_SPILL
+      = "SELECT " + COLUMN_FLV_SPILL + " FROM " + TABLE_FL_VLSPILL
+          + " WHERE " + COLUMN_FLV_HEADER + " = ?";
+
+  public static final String STMT_DELETE_HEADER_VALUE_SPILL
+      = "DELETE FROM " + TABLE_FL_VLSPILL + " WHERE "
+          + COLUMN_FLV_HEADER + " = ?";
+
+  public static final String STMT_DELETE_HEADER_NAME_SPILL
+      = "DELETE FROM " + TABLE_FL_NMSPILL + " WHERE "
+          + COLUMN_FLN_HEADER + " = ?";
+
+  public static final String STMT_DELETE_EVENT_SPILL
+      = "DELETE FROM " + TABLE_FL_PLSPILL + " WHERE "
+          + COLUMN_FLP_EVENT + " = ?";
+
+  public static final String STMT_DELETE_HEADER_BASE
+      = "DELETE FROM " + TABLE_FL_HEADER + " WHERE "
+          + COLUMN_FLH_EVENT + " = ?";
+
+  public static final String STMT_DELETE_EVENT_BASE
+      = "DELETE FROM " + TABLE_FL_EVENT + " WHERE "
+          + COLUMN_FLE_ID + " = ?";
+
+  private final DataSource dataSource;
+
+  protected DerbySchemaHandler(DataSource dataSource) {
+    this.dataSource = dataSource;
+  }
+
+  @Override
+  public boolean schemaExists() {
+    Connection connection = null;
+    Statement stmt = null;
+    try {
+      connection = dataSource.getConnection();
+      stmt = connection.createStatement();
+      ResultSet  rset = stmt.executeQuery(QUREY_SYSCHEMA_FLUME);
+      if (!rset.next()) {
+        LOGGER.warn("Schema for FLUME does not exist");
+        return false;
+      }
+
+      String flumeSchemaId = rset.getString(1);
+      LOGGER.debug("Flume schema ID: " + flumeSchemaId);
+
+      connection.commit();
+    } catch (SQLException ex) {
+      try {
+        connection.rollback();
+      } catch (SQLException ex2) {
+        LOGGER.error("Unable to rollback transaction", ex2);
+      }
+      throw new JdbcChannelException("Unable to query schema", ex);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close schema lookup stmt", ex);
+        }
+      }
+      if (connection != null) {
+        try {
+          connection.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close connection", ex);
+        }
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public void createSchemaObjects(boolean createForeignKeys,
+      boolean createIndex) {
+    runQuery(QUERY_CREATE_SCHEMA_FLUME);
+    runQuery(QUERY_CREATE_TABLE_FL_EVENT);
+
+    if (createForeignKeys) {
+      runQuery(QUERY_CREATE_TABLE_FL_PLSPILL_FK);
+      runQuery(QUERY_CREATE_TABLE_FL_HEADER_FK);
+      runQuery(QUERY_CREATE_TABLE_FL_NMSPILL_FK);
+      runQuery(QUERY_CREATE_TABLE_FL_VLSPILL_FK);
+    } else {
+      runQuery(QUERY_CREATE_TABLE_FL_PLSPILL_NOFK);
+      runQuery(QUERY_CREATE_TABLE_FL_HEADER_NOFK);
+      runQuery(QUERY_CREATE_TABLE_FL_NMSPILL_NOFK);
+      runQuery(QUERY_CREATE_TABLE_FL_VLSPILL_NOFK);
+    }
+
+    if (createIndex) {
+      runQuery(QUERY_CREATE_INDEX_FLE_CHANNEL);
+      runQuery(QUERY_CREATE_INDEX_FLH_EVENT);
+      runQuery(QUERY_CREATE_INDEX_FLP_EVENT);
+      runQuery(QUERY_CREATE_INDEX_FLN_HEADER);
+      runQuery(QUERY_CREATE_INDEX_FLV_HEADER);
+    }
+  }
+
+  @Override
+  public void validateSchema() {
+    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_EVENT_NAME,
+        COLUMN_FLE_ID, COLUMN_FLE_PAYLOAD, COLUMN_FLE_CHANNEL,
+        COLUMN_FLE_SPILL);
+
+    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_PLSPILL_NAME,
+        COLUMN_FLP_EVENT, COLUMN_FLP_SPILL);
+
+    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_HEADER_NAME,
+        COLUMN_FLH_ID, COLUMN_FLH_EVENT, COLUMN_FLH_NAME, COLUMN_FLH_VALUE,
+        COLUMN_FLH_NMSPILL, COLUMN_FLH_VLSPILL);
+
+    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_NMSPILL_NAME,
+        COLUMN_FLN_HEADER, COLUMN_FLN_SPILL);
+
+    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_VLSPILL_NAME,
+        COLUMN_FLV_HEADER, COLUMN_FLV_SPILL);
+  }
+
+  private void verifyTableStructure(String schemaName, String tableName,
+      String... columns) {
+    Set<String> columnNames = new HashSet<String>();
+    Connection connection = null;
+    PreparedStatement pStmt = null;
+    try {
+      connection = dataSource.getConnection();
+      pStmt = connection.prepareStatement(COLUMN_LOOKUP_QUERY);
+      pStmt.setString(1, tableName);
+      pStmt.setString(2, schemaName);
+      ResultSet rset = pStmt.executeQuery();
+
+      while (rset.next()) {
+        columnNames.add(rset.getString(1));
+      }
+      connection.commit();
+    } catch (SQLException ex) {
+      try {
+        connection.rollback();
+      } catch (SQLException ex2) {
+        LOGGER.error("Unable to rollback transaction", ex2);
+      }
+      throw new JdbcChannelException("Unable to run query: "
+          + COLUMN_LOOKUP_QUERY + ": 1=" + tableName + ", 2=" + schemaName, ex);
+    } finally {
+      if (pStmt != null) {
+        try {
+          pStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close statement", ex);
+        }
+        if (connection != null) {
+          try {
+            connection.close();
+          } catch (SQLException ex) {
+            LOGGER.error("Unable to close connection", ex);
+          }
+        }
+      }
+    }
+
+    Set<String> columnDiff = new HashSet<String>();
+    columnDiff.addAll(columnNames);
+
+    // Expected Column string form
+    StringBuilder sb = new StringBuilder("{");
+    boolean first = true;
+    for (String column : columns) {
+      columnDiff.remove(column);
+      if (first) {
+        first = false;
+      } else {
+        sb.append(", ");
+      }
+      sb.append(column);
+    }
+    sb.append("}");
+
+    String expectedColumns = sb.toString();
+
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Table " + schemaName + "." + tableName
+          + " expected columns: " + expectedColumns + ", actual columns: "
+          + columnNames);
+    }
+
+    if (columnNames.size() != columns.length || columnDiff.size() != 0) {
+      throw new JdbcChannelException("Expected table " + schemaName + "."
+          + tableName + " to have columns: " + expectedColumns + ". Instead "
+          + "found columns: " + columnNames);
+    }
+  }
+
+  private void runQuery(String query) {
+    Connection connection = null;
+    Statement stmt = null;
+    try {
+      connection = dataSource.getConnection();
+      stmt = connection.createStatement();
+      if (stmt.execute(query)) {
+        ResultSet rset = stmt.getResultSet();
+        int count = 0;
+        while (rset.next()) {
+          count++;
+        }
+        LOGGER.info("QUERY(" + query + ") produced unused resultset with "
+            + count + " rows");
+      } else {
+        int updateCount = stmt.getUpdateCount();
+        LOGGER.info("QUERY(" + query + ") Update count: " + updateCount);
+      }
+      connection.commit();
+    } catch (SQLException ex) {
+      try {
+        connection.rollback();
+      } catch (SQLException ex2) {
+        LOGGER.error("Unable to rollback transaction", ex2);
+      }
+      throw new JdbcChannelException("Unable to run query: "
+          + query, ex);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close statement", ex);
+        }
+        if (connection != null) {
+          try {
+            connection.close();
+          } catch (SQLException ex) {
+            LOGGER.error("Unable to close connection", ex);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void storeEvent(PersistableEvent pe, Connection connection) {
+    // First populate the main event table
+    byte[] basePayload = pe.getBasePayload();
+    byte[] spillPayload = pe.getSpillPayload();
+    boolean hasSpillPayload = (spillPayload != null);
+    String channelName = pe.getChannelName();
+
+    LOGGER.debug("Preparing insert event: " + pe);
+
+    PreparedStatement baseEventStmt = null;
+    PreparedStatement spillEventStmt = null;
+    PreparedStatement baseHeaderStmt = null;
+    PreparedStatement headerNameSpillStmt = null;
+    PreparedStatement headerValueSpillStmt = null;
+    try {
+      baseEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_BASE,
+                          Statement.RETURN_GENERATED_KEYS);
+      baseEventStmt.setBytes(1, basePayload);
+      baseEventStmt.setString(2, channelName);
+      baseEventStmt.setBoolean(3, hasSpillPayload);
+
+      int baseEventCount = baseEventStmt.executeUpdate();
+      if (baseEventCount != 1) {
+        throw new JdbcChannelException("Invalid update count on base "
+            + "event insert: " + baseEventCount);
+      }
+      // Extract event ID and set it
+      ResultSet eventIdResult = baseEventStmt.getGeneratedKeys();
+
+      if (!eventIdResult.next()) {
+        throw new JdbcChannelException("Unable to retrieive inserted event-id");
+      }
+
+      long eventId = eventIdResult.getLong(1);
+      pe.setEventId(eventId);
+
+      // Persist the payload spill
+      if (hasSpillPayload) {
+        spillEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_SPILL);
+        spillEventStmt.setLong(1, eventId);
+        spillEventStmt.setBinaryStream(2,
+            new ByteArrayInputStream(spillPayload), spillPayload.length);
+        int spillEventCount = spillEventStmt.executeUpdate();
+        if (spillEventCount != 1) {
+          throw new JdbcChannelException("Invalid update count on spill "
+              + "event insert: " + spillEventCount);
+        }
+      }
+
+      // Persist the headers
+      List<HeaderEntry> headers = pe.getHeaderEntries();
+      if (headers != null && headers.size() > 0) {
+        List<HeaderEntry> headerWithNameSpill = new ArrayList<HeaderEntry>();
+        List<HeaderEntry> headerWithValueSpill = new ArrayList<HeaderEntry>();
+
+        baseHeaderStmt = connection.prepareStatement(STMT_INSERT_HEADER_BASE,
+                                Statement.RETURN_GENERATED_KEYS);
+        Iterator<HeaderEntry> it = headers.iterator();
+        while (it.hasNext()) {
+          HeaderEntry entry = it.next();
+          SpillableString name = entry.getName();
+          SpillableString value = entry.getValue();
+          baseHeaderStmt.setLong(1, eventId);
+          baseHeaderStmt.setString(2, name.getBase());
+          baseHeaderStmt.setString(3, value.getBase());
+          baseHeaderStmt.setBoolean(4, name.hasSpill());
+          baseHeaderStmt.setBoolean(5, value.hasSpill());
+
+          int updateCount = baseHeaderStmt.executeUpdate();
+          if (updateCount != 1) {
+            throw new JdbcChannelException("Unexpected update header count: " + updateCount);
+          }
+          ResultSet headerIdResultSet = baseHeaderStmt.getGeneratedKeys();
+          if (!headerIdResultSet.next()) {
+            throw new JdbcChannelException(
+                "Unable to retrieve inserted header id");
+          }
+          long headerId = headerIdResultSet.getLong(1);
+          entry.setId(headerId);
+
+          if (name.hasSpill()) {
+            headerWithNameSpill.add(entry);
+          }
+
+          if (value.hasSpill()) {
+            headerWithValueSpill.add(entry);
+          }
+        }
+
+        // Persist header name spills
+        if (headerWithNameSpill.size() > 0) {
+          LOGGER.debug("Number of headers with name spill: "
+                  + headerWithNameSpill.size());
+
+          headerNameSpillStmt =
+              connection.prepareStatement(STMT_INSERT_HEADER_NAME_SPILL);
+
+          for (HeaderEntry entry : headerWithNameSpill) {
+            String nameSpill = entry.getName().getSpill();
+
+            headerNameSpillStmt.setLong(1, entry.getId());
+            headerNameSpillStmt.setString(2, nameSpill);
+            headerNameSpillStmt.addBatch();
+          }
+
+          int[] nameSpillUpdateCount = headerNameSpillStmt.executeBatch();
+          if (nameSpillUpdateCount.length != headerWithNameSpill.size()) {
+            throw new JdbcChannelException("Unexpected update count for header "
+                + "name spills: expected " + headerWithNameSpill.size() + ", "
+                + "found " + nameSpillUpdateCount.length);
+          }
+
+          for (int i = 0; i < nameSpillUpdateCount.length; i++) {
+            if (nameSpillUpdateCount[i] != 1) {
+              throw new JdbcChannelException("Unexpected update count for "
+                  + "header name spill at position " + i + ", value: "
+                  + nameSpillUpdateCount[i]);
+            }
+          }
+        }
+
+        // Persist header value spills
+        if (headerWithValueSpill.size() > 0) {
+          LOGGER.debug("Number of headers with value spill: "
+              + headerWithValueSpill.size());
+
+          headerValueSpillStmt =
+              connection.prepareStatement(STMT_INSERT_HEADER_VALUE_SPILL);
+
+          for (HeaderEntry entry : headerWithValueSpill) {
+            String valueSpill = entry.getValue().getSpill();
+
+            headerValueSpillStmt.setLong(1, entry.getId());
+            headerValueSpillStmt.setString(2, valueSpill);
+            headerValueSpillStmt.addBatch();
+          }
+
+          int[] valueSpillUpdateCount = headerValueSpillStmt.executeBatch();
+          if (valueSpillUpdateCount.length != headerWithValueSpill.size()) {
+            throw new JdbcChannelException("Unexpected update count for header "
+                + "value spills: expected " + headerWithValueSpill.size() + ", "
+                + "found " + valueSpillUpdateCount.length);
+          }
+
+          for (int i = 0; i < valueSpillUpdateCount.length; i++) {
+            if (valueSpillUpdateCount[i] != 1) {
+              throw new JdbcChannelException("Unexpected update count for "
+                  + "header value spill at position " + i + ", value: "
+                  + valueSpillUpdateCount[i]);
+            }
+          }
+        }
+      }
+    } catch (SQLException ex) {
+      throw new JdbcChannelException("Unable to persist event: " + pe, ex);
+    } finally {
+      if (baseEventStmt != null) {
+        try {
+          baseEventStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close base event statement", ex);
+        }
+      }
+      if (spillEventStmt != null) {
+        try {
+          spillEventStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close spill event statement", ex);
+        }
+      }
+      if (baseHeaderStmt != null) {
+        try {
+          baseHeaderStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close base header statement", ex);
+        }
+      }
+      if (headerNameSpillStmt != null) {
+        try {
+          headerNameSpillStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close header name spill statement", ex);
+        }
+      }
+      if (headerValueSpillStmt != null) {
+        try {
+          headerValueSpillStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close header value spill statement", ex);
+        }
+      }
+    }
+
+    LOGGER.debug("Event persisted: " + pe);
+  }
+
+  @Override
+  public PersistableEvent fetchAndDeleteEvent(String channel,
+      Connection connection) {
+    PersistableEvent.Builder peBuilder = null;
+    PreparedStatement baseEventFetchStmt = null;
+    PreparedStatement spillEventFetchStmt = null;
+    InputStream payloadInputStream = null;
+    PreparedStatement baseHeaderFetchStmt = null;
+    PreparedStatement nameSpillHeaderStmt = null;
+    PreparedStatement valueSpillHeaderStmt = null;
+    PreparedStatement deleteSpillEventStmt = null;
+    PreparedStatement deleteNameSpillHeaderStmt = null;
+    PreparedStatement deleteValueSpillHeaderStmt = null;
+    PreparedStatement deleteBaseHeaderStmt = null;
+    PreparedStatement deleteBaseEventStmt = null;
+    try {
+      baseEventFetchStmt = connection.prepareStatement(STMT_FETCH_PAYLOAD_BASE);
+      baseEventFetchStmt.setString(1, channel);
+      ResultSet rsetBaseEvent = baseEventFetchStmt.executeQuery();
+
+      if (!rsetBaseEvent.next()) {
+        // Empty result set
+        LOGGER.debug("No events found for channel: " + channel);
+        return null;
+      }
+
+      // Populate event id, payload
+      long eventId = rsetBaseEvent.getLong(1);
+      peBuilder = new PersistableEvent.Builder(channel, eventId);
+      peBuilder.setBasePayload(rsetBaseEvent.getBytes(2));
+      boolean hasSpill = rsetBaseEvent.getBoolean(3);
+
+      if (hasSpill) {
+        spillEventFetchStmt =
+            connection.prepareStatement(STMT_FETCH_PAYLOAD_SPILL);
+
+        spillEventFetchStmt.setLong(1, eventId);
+        ResultSet rsetSpillEvent = spillEventFetchStmt.executeQuery();
+        if (!rsetSpillEvent.next()) {
+          throw new JdbcChannelException("Payload spill expected but not "
+              + "found for event: " + eventId);
+        }
+        Blob payloadSpillBlob = rsetSpillEvent.getBlob(1);
+        payloadInputStream = payloadSpillBlob.getBinaryStream();
+        ByteArrayOutputStream spillStream = new ByteArrayOutputStream();
+        byte[] buffer = new byte[1024];
+        int length = 0;
+        while ((length = payloadInputStream.read(buffer)) != -1) {
+          spillStream.write(buffer, 0, length);
+        }
+        peBuilder.setSpillPayload(spillStream.toByteArray());
+
+        // Delete this spill
+        deleteSpillEventStmt =
+            connection.prepareStatement(STMT_DELETE_EVENT_SPILL);
+        deleteSpillEventStmt.setLong(1, eventId);
+
+        int updateCount = deleteSpillEventStmt.executeUpdate();
+        if (updateCount != 1) {
+          throw new JdbcChannelException("Unexpected row count for spill "
+              + "delete: " + updateCount);
+        }
+      }
+
+      if (rsetBaseEvent.next()) {
+        throw new JdbcChannelException("More than expected events retrieved");
+      }
+
+      // Populate headers
+      List<Long> nameSpillHeaders = null;
+      List<Long> valueSpillHeaders = null;
+      baseHeaderFetchStmt = connection.prepareStatement(STMT_FETCH_HEADER_BASE);
+      baseHeaderFetchStmt.setLong(1, eventId);
+      int headerCount = 0; // for later delete validation
+
+      ResultSet rsetBaseHeader = baseHeaderFetchStmt.executeQuery();
+      while (rsetBaseHeader.next()) {
+        headerCount++;
+        long headerId = rsetBaseHeader.getLong(1);
+        String baseName = rsetBaseHeader.getString(2);
+        String baseValue = rsetBaseHeader.getString(3);
+        boolean hasNameSpill = rsetBaseHeader.getBoolean(4);
+        boolean hasValueSpill = rsetBaseHeader.getBoolean(5);
+
+        peBuilder.setHeader(headerId, baseName, baseValue);
+        if (hasNameSpill) {
+          if (nameSpillHeaders == null) {
+            nameSpillHeaders = new ArrayList<Long>();
+          }
+          nameSpillHeaders.add(headerId);
+        }
+
+        if (hasValueSpill) {
+          if (valueSpillHeaders == null) {
+            valueSpillHeaders = new ArrayList<Long>();
+          }
+          valueSpillHeaders.add(headerId);
+        }
+      }
+
+      if (nameSpillHeaders != null) {
+
+        nameSpillHeaderStmt =
+            connection.prepareStatement(STMT_FETCH_HEADER_NAME_SPILL);
+
+        deleteNameSpillHeaderStmt =
+            connection.prepareStatement(STMT_DELETE_HEADER_NAME_SPILL);
+        for (long headerId : nameSpillHeaders) {
+          nameSpillHeaderStmt.setLong(1, headerId);
+          ResultSet rsetHeaderNameSpill = nameSpillHeaderStmt.executeQuery();
+          if (!rsetHeaderNameSpill.next()) {
+            throw new JdbcChannelException("Name spill was set for header "
+                + headerId + " but was not found");
+          }
+          String nameSpill = rsetHeaderNameSpill.getString(1);
+
+          peBuilder.setHeaderNameSpill(headerId, nameSpill);
+          deleteNameSpillHeaderStmt.setLong(1, headerId);
+          deleteNameSpillHeaderStmt.addBatch();
+        }
+
+        // Delete header name spills
+        int[] headerNameSpillDelete = deleteNameSpillHeaderStmt.executeBatch();
+        if (headerNameSpillDelete.length != nameSpillHeaders.size()) {
+          throw new JdbcChannelException("Unexpected number of header name "
+              + "spill deletes: expected " + nameSpillHeaders.size()
+              + ", found: " + headerNameSpillDelete.length);
+        }
+
+        for (int numRowsAffected : headerNameSpillDelete) {
+          if (numRowsAffected != 1) {
+            throw new JdbcChannelException("Unexpected number of deleted rows "
+                + "for header name spill deletes: " + numRowsAffected);
+          }
+        }
+      }
+
+      if (valueSpillHeaders != null) {
+        valueSpillHeaderStmt =
+            connection.prepareStatement(STMT_FETCH_HEADER_VALUE_SPILL);
+
+        deleteValueSpillHeaderStmt =
+            connection.prepareStatement(STMT_DELETE_HEADER_VALUE_SPILL);
+        for (long headerId: valueSpillHeaders) {
+          valueSpillHeaderStmt.setLong(1, headerId);
+          ResultSet rsetHeaderValueSpill = valueSpillHeaderStmt.executeQuery();
+          if (!rsetHeaderValueSpill.next()) {
+            throw new JdbcChannelException("Value spill was set for header "
+                + headerId + " but was not found");
+          }
+          String valueSpill = rsetHeaderValueSpill.getString(1);
+
+          peBuilder.setHeaderValueSpill(headerId, valueSpill);
+          deleteValueSpillHeaderStmt.setLong(1, headerId);
+          deleteValueSpillHeaderStmt.addBatch();
+        }
+        // Delete header value spills
+        int[] headerValueSpillDelete = deleteValueSpillHeaderStmt.executeBatch();
+        if (headerValueSpillDelete.length != valueSpillHeaders.size()) {
+          throw new JdbcChannelException("Unexpected number of header value "
+              + "spill deletes: expected " + valueSpillHeaders.size()
+              + ", found: " + headerValueSpillDelete.length);
+        }
+
+        for (int numRowsAffected : headerValueSpillDelete) {
+          if (numRowsAffected != 1) {
+            throw new JdbcChannelException("Unexpected number of deleted rows "
+                + "for header value spill deletes: " + numRowsAffected);
+          }
+        }
+      }
+
+      // Now delete Headers
+      if (headerCount > 0) {
+        deleteBaseHeaderStmt =
+            connection.prepareStatement(STMT_DELETE_HEADER_BASE);
+        deleteBaseHeaderStmt.setLong(1, eventId);
+
+        int rowCount = deleteBaseHeaderStmt.executeUpdate();
+        if (rowCount != headerCount) {
+          throw new JdbcChannelException("Unexpected base header delete count: "
+              + "expected: " + headerCount + ", found: " + rowCount);
+        }
+      }
+
+      // Now delete the Event
+      deleteBaseEventStmt = connection.prepareStatement(STMT_DELETE_EVENT_BASE);
+      deleteBaseEventStmt.setLong(1, eventId);
+      int rowCount = deleteBaseEventStmt.executeUpdate();
+
+      if (rowCount != 1) {
+        throw new JdbcChannelException("Unexpected row count for delete of "
+            + "event-id: " + eventId + ", count: " + rowCount);
+      }
+
+    } catch (SQLException ex) {
+      throw new JdbcChannelException("Unable to retrieve event", ex);
+    } catch (IOException ex) {
+      throw new JdbcChannelException("Unable to read data", ex);
+    } finally {
+      if (payloadInputStream != null) {
+        try {
+          payloadInputStream.close();
+        } catch (IOException ex) {
+          LOGGER.error("Unable to close payload spill stream", ex);
+        }
+      }
+      if (baseEventFetchStmt != null) {
+        try {
+          baseEventFetchStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close base event fetch statement", ex);
+        }
+      }
+      if (spillEventFetchStmt != null) {
+        try {
+          spillEventFetchStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close spill event fetch statment", ex);
+        }
+      }
+      if (deleteSpillEventStmt != null) {
+        try {
+          deleteSpillEventStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close event spill delete statement", ex);
+        }
+      }
+      if (baseHeaderFetchStmt != null) {
+        try {
+          baseHeaderFetchStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close base header fetch statement", ex);
+        }
+      }
+      if (nameSpillHeaderStmt != null) {
+        try {
+          nameSpillHeaderStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close name spill fetch statement", ex);
+        }
+      }
+      if (valueSpillHeaderStmt != null) {
+        try {
+          valueSpillHeaderStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close value spill fetch statement", ex);
+        }
+      }
+      if (deleteNameSpillHeaderStmt != null) {
+        try {
+          deleteNameSpillHeaderStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close value spill delete statement", ex);
+        }
+      }
+      if (deleteValueSpillHeaderStmt != null) {
+        try {
+          deleteValueSpillHeaderStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close value spill delete statement", ex);
+        }
+      }
+      if (deleteBaseHeaderStmt != null) {
+        try {
+          deleteBaseHeaderStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close base header delete statement", ex);
+        }
+      }
+      if (deleteBaseEventStmt != null) {
+        try {
+          deleteBaseEventStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close base event delete statement", ex);
+        }
+      }
+    }
+
+    return peBuilder.build();
+  }
+
+  @Override
+  public long getChannelSize(Connection connection) {
+    long size = 0L;
+    Statement stmt = null;
+    try {
+      stmt = connection.createStatement();
+      stmt.execute(QUERY_CHANNEL_SIZE);
+      ResultSet rset = stmt.getResultSet();
+      if (!rset.next()) {
+        throw new JdbcChannelException("Failed to determine channel size: "
+              + "Query (" + QUERY_CHANNEL_SIZE
+              + ") did not produce any results");
+      }
+
+      size = rset.getLong(1);
+      connection.commit();
+    } catch (SQLException ex) {
+      try {
+        connection.rollback();
+      } catch (SQLException ex2) {
+        LOGGER.error("Unable to rollback transaction", ex2);
+      }
+      throw new JdbcChannelException("Unable to run query: "
+          + QUERY_CHANNEL_SIZE, ex);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close statement", ex);
+        }
+      }
+    }
+
+    return size;
+  }
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
new file mode 100644
index 0000000..01caef3
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
@@ -0,0 +1,618 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbcp.ConnectionFactory;
+import org.apache.commons.dbcp.DriverManagerConnectionFactory;
+import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.commons.dbcp.PoolingDataSource;
+import org.apache.commons.pool.KeyedObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.channel.jdbc.ConfigurationConstants;
+import org.apache.flume.channel.jdbc.DatabaseType;
+import org.apache.flume.channel.jdbc.JdbcChannelException;
+import org.apache.flume.channel.jdbc.JdbcChannelProvider;
+import org.apache.flume.channel.jdbc.TransactionIsolation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcChannelProviderImpl implements JdbcChannelProvider {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(JdbcChannelProviderImpl.class);
+
+  private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME
+      = "org.apache.derby.jdbc.EmbeddedDriver";
+
+  private static final String DEFAULT_DRIVER_CLASSNAME = EMBEDDED_DERBY_DRIVER_CLASSNAME;
+  private static final String DEFAULT_USERNAME = "sa";
+  private static final String DEFAULT_PASSWORD = "";
+  private static final String DEFAULT_DBTYPE = "DERBY";
+
+  /** The connection pool. */
+  private GenericObjectPool connectionPool;
+
+  /** The statement cache pool */
+  private KeyedObjectPoolFactory statementPool;
+
+  /** The data source. */
+  private DataSource dataSource;
+
+  /** The database type. */
+  private DatabaseType databaseType;
+
+  /** The schema handler. */
+  private SchemaHandler schemaHandler;
+
+  /** Transaction factory */
+  private JdbcTransactionFactory txFactory;
+
+  /** Connection URL */
+  private String connectUrl;
+
+  /** Driver Class Name */
+  private String driverClassName;
+
+  /** Capacity Counter if one is needed */
+  private long maxCapacity = 0L;
+
+  /** The current size of the channel. */
+  private AtomicLong currentSize = new AtomicLong(0L);
+
+  @Override
+  public void initialize(Context context) {
+    LOGGER.debug("Initializing JDBC Channel provider");
+
+    initializeSystemProperties(context);
+    initializeDataSource(context);
+    initializeSchema(context);
+    initializeChannelState(context);
+  }
+
+  private void initializeSystemProperties(Context context) {
+    Map<String, String> sysProps = new HashMap<String, String>();
+
+    Map<String, String> sysPropsOld = context.getSubProperties(
+        ConfigurationConstants.OLD_CONFIG_JDBC_SYSPROP_PREFIX);
+
+    if (sysPropsOld.size() > 0) {
+      LOGGER.warn("Long form configuration prefix \""
+          + ConfigurationConstants.OLD_CONFIG_JDBC_SYSPROP_PREFIX
+          + "\" is deprecated. Please use the short form prefix \""
+          + ConfigurationConstants.CONFIG_JDBC_SYSPROP_PREFIX
+          + "\" instead.");
+
+      sysProps.putAll(sysPropsOld);
+    }
+
+    Map<String, String> sysPropsNew = context.getSubProperties(
+        ConfigurationConstants.CONFIG_JDBC_SYSPROP_PREFIX);
+
+    // Override the deprecated values with the non-deprecated
+    if (sysPropsNew.size() > 0) {
+      sysProps.putAll(sysPropsNew);
+    }
+
+    for (String key: sysProps.keySet()) {
+      String value = sysProps.get(key);
+      if (key != null && value != null) {
+        System.setProperty(key, value);
+      }
+    }
+  }
+
+  private void initializeChannelState(Context context) {
+
+    String maxCapacityStr = getConfigurationString(context,
+        ConfigurationConstants.CONFIG_MAX_CAPACITY,
+        ConfigurationConstants.OLD_CONFIG_MAX_CAPACITY, "0");
+
+    long maxCapacitySpecified = 0;
+    try {
+      maxCapacitySpecified = Long.parseLong(maxCapacityStr);
+    } catch (NumberFormatException nfe) {
+      LOGGER.warn("Invalid value specified for maximum channel capacity: "
+          + maxCapacityStr, nfe);
+    }
+
+    if (maxCapacitySpecified > 0) {
+      this.maxCapacity = maxCapacitySpecified;
+      LOGGER.info("Maximum channel capacity: {}", maxCapacity);
+    } else {
+      LOGGER.warn("JDBC channel will operate without a capacity limit.");
+    }
+
+    if (maxCapacity > 0) {
+      // Initialize current size
+      JdbcTransactionImpl tx = null;
+      try {
+        tx = getTransaction();
+        tx.begin();
+        Connection conn = tx.getConnection();
+
+        currentSize.set(schemaHandler.getChannelSize(conn));
+        tx.commit();
+      } catch (Exception ex) {
+        tx.rollback();
+        throw new JdbcChannelException("Failed to initialize current size", ex);
+      } finally {
+        if (tx != null) {
+          tx.close();
+        }
+      }
+
+      long currentSizeLong = currentSize.get();
+
+      if (currentSizeLong > maxCapacity) {
+        LOGGER.warn("The current size of channel (" + currentSizeLong
+            + ") is more than the specified maximum capacity (" + maxCapacity
+            + "). If this situation persists, it may require resizing and "
+            + "replanning of your deployment.");
+      }
+      LOGGER.info("Current channel size: {}", currentSizeLong);
+    }
+  }
+
+  private void initializeSchema(Context context) {
+    String createSchemaFlag = getConfigurationString(context,
+        ConfigurationConstants.CONFIG_CREATE_SCHEMA,
+        ConfigurationConstants.OLD_CONFIG_CREATE_SCHEMA, "true");
+
+    boolean createSchema = Boolean.valueOf(createSchemaFlag);
+    LOGGER.debug("Create schema flag set to: " + createSchema);
+
+    // First check if the schema exists
+    schemaHandler = SchemaHandlerFactory.getHandler(databaseType, dataSource);
+
+    if (!schemaHandler.schemaExists()) {
+      if (!createSchema) {
+        throw new JdbcChannelException("Schema does not exist and "
+            + "auto-generation is disabled. Please enable auto-generation of "
+            + "schema and try again.");
+      }
+
+      String createIndexFlag = getConfigurationString(context,
+          ConfigurationConstants.CONFIG_CREATE_INDEX,
+          ConfigurationConstants.OLD_CONFIG_CREATE_INDEX, "true");
+
+      String createForeignKeysFlag = getConfigurationString(context,
+          ConfigurationConstants.CONFIG_CREATE_FK,
+          ConfigurationConstants.OLD_CONFIG_CREATE_FK, "true");
+
+
+      boolean createIndex = Boolean.valueOf(createIndexFlag);
+      if (!createIndex) {
+        LOGGER.warn("Index creation is disabled, indexes will not be created.");
+      }
+
+      boolean createForeignKeys = Boolean.valueOf(createForeignKeysFlag);
+      if (createForeignKeys) {
+        LOGGER.info("Foreign Key Constraints will be enabled.");
+      } else {
+        LOGGER.info("Foreign Key Constratins will be disabled.");
+      }
+
+      // Now create schema
+      schemaHandler.createSchemaObjects(createForeignKeys, createIndex);
+    }
+
+    // Validate all schema objects are as expected
+    schemaHandler.validateSchema();
+  }
+
+
+  @Override
+  public void close() {
+    try {
+      connectionPool.close();
+    } catch (Exception ex) {
+      throw new JdbcChannelException("Unable to close connection pool", ex);
+    }
+
+    if (databaseType.equals(DatabaseType.DERBY)
+        && driverClassName.equals(EMBEDDED_DERBY_DRIVER_CLASSNAME)) {
+      // Need to shut down the embedded Derby instance
+      if (connectUrl.startsWith("jdbc:derby:")) {
+        int index = connectUrl.indexOf(";");
+        String baseUrl = null;
+        if (index != -1) {
+          baseUrl = connectUrl.substring(0, index + 1);
+        } else {
+          baseUrl = connectUrl + ";";
+        }
+        String shutDownUrl = baseUrl + "shutdown=true";
+
+        LOGGER.debug("Attempting to shutdown embedded Derby using URL: "
+            + shutDownUrl);
+
+        try {
+          DriverManager.getConnection(shutDownUrl);
+        } catch (SQLException ex) {
+          // Shutdown for one db instance is expected to raise SQL STATE 45000
+          if (ex.getErrorCode() != 45000) {
+            throw new JdbcChannelException(
+                "Unable to shutdown embedded Derby: " + shutDownUrl
+                + " Error Code: " + ex.getErrorCode(), ex);
+          }
+          LOGGER.info("Embedded Derby shutdown raised SQL STATE "
+              + "45000 as expected.");
+        }
+      } else {
+        LOGGER.warn("Even though embedded Derby drvier was loaded, the connect "
+            + "URL is of an unexpected form: " + connectUrl + ". Therfore no "
+            + "attempt will be made to shutdown embedded Derby instance.");
+      }
+    }
+
+    dataSource = null;
+    txFactory = null;
+    schemaHandler = null;
+  }
+
+  @Override
+  public void persistEvent(String channel, Event event) {
+    PersistableEvent persistableEvent = new PersistableEvent(channel, event);
+    JdbcTransactionImpl tx = null;
+    try {
+      tx = getTransaction();
+      tx.begin();
+
+      if (maxCapacity > 0) {
+        long currentSizeLong = currentSize.get();
+        if (currentSizeLong >= maxCapacity) {
+          throw new JdbcChannelException("Channel capacity reached: "
+              + "maxCapacity: " + maxCapacity + ", currentSize: "
+              + currentSizeLong);
+        }
+      }
+
+      // Persist the persistableEvent
+      schemaHandler.storeEvent(persistableEvent, tx.getConnection());
+
+      tx.incrementPersistedEventCount();
+
+      tx.commit();
+    } catch (Exception ex) {
+      tx.rollback();
+      throw new JdbcChannelException("Failed to persist event", ex);
+    } finally {
+      if (tx != null) {
+        tx.close();
+      }
+    }
+
+    LOGGER.debug("Persisted event: {}", persistableEvent.getEventId());
+  }
+
+  @Override
+  public Event removeEvent(String channelName) {
+    PersistableEvent result = null;
+    JdbcTransactionImpl tx = null;
+    try {
+      tx = getTransaction();
+      tx.begin();
+
+      // Retrieve the persistableEvent
+      result = schemaHandler.fetchAndDeleteEvent(
+          channelName, tx.getConnection());
+
+      if (result != null) {
+        tx.incrementRemovedEventCount();
+      }
+
+      tx.commit();
+    } catch (Exception ex) {
+      tx.rollback();
+      throw new JdbcChannelException("Failed to persist event", ex);
+    } finally {
+      if (tx != null) {
+        tx.close();
+      }
+    }
+
+    if (result != null) {
+      LOGGER.debug("Removed event: {}", ((PersistableEvent) result).getEventId());
+    } else {
+      LOGGER.debug("No event found for removal");
+    }
+
+    return result;
+  }
+
+  @Override
+  public JdbcTransactionImpl getTransaction() {
+    return txFactory.get();
+  }
+
+  /**
+
+   * Initializes the datasource and the underlying connection pool.
+   * @param context
+   */
+  private void initializeDataSource(Context context) {
+    driverClassName = getConfigurationString(context,
+        ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS,
+        ConfigurationConstants.OLD_CONFIG_JDBC_DRIVER_CLASS, null);
+
+    connectUrl = getConfigurationString(context,
+        ConfigurationConstants.CONFIG_URL,
+        ConfigurationConstants.OLD_CONFIG_URL, null);
+
+
+    String userName = getConfigurationString(context,
+        ConfigurationConstants.CONFIG_USERNAME,
+        ConfigurationConstants.OLD_CONFIG_USERNAME, null);
+
+    String password = getConfigurationString(context,
+        ConfigurationConstants.CONFIG_PASSWORD,
+        ConfigurationConstants.OLD_CONFIG_PASSWORD, null);
+
+    String jdbcPropertiesFile = getConfigurationString(context,
+        ConfigurationConstants.CONFIG_JDBC_PROPS_FILE,
+        ConfigurationConstants.OLD_CONFIG_JDBC_PROPS_FILE, null);
+
+    String dbTypeName = getConfigurationString(context,
+        ConfigurationConstants.CONFIG_DATABASE_TYPE,
+        ConfigurationConstants.OLD_CONFIG_DATABASE_TYPE, null);
+
+    // If connect URL is not specified, use embedded Derby
+    if (connectUrl == null || connectUrl.trim().length() == 0) {
+      LOGGER.warn("No connection URL specified. "
+          + "Using embedded derby database instance.");
+
+      driverClassName = DEFAULT_DRIVER_CLASSNAME;
+      userName = DEFAULT_USERNAME;
+      password = DEFAULT_PASSWORD;
+      dbTypeName = DEFAULT_DBTYPE;
+
+      String homePath = System.getProperty("user.home").replace('\\', '/');
+
+      String defaultDbDir = homePath + "/.flume/jdbc-channel";
+
+
+      File dbDir = new File(defaultDbDir);
+      String canonicalDbDirPath = null;
+
+      try {
+        canonicalDbDirPath = dbDir.getCanonicalPath();
+      } catch (IOException ex) {
+        throw new JdbcChannelException("Unable to find canonical path of dir: "
+            + defaultDbDir, ex);
+      }
+
+      if (!dbDir.exists()) {
+        if (!dbDir.mkdirs()) {
+          throw new JdbcChannelException("unable to create directory: "
+              + canonicalDbDirPath);
+        }
+      }
+
+      connectUrl = "jdbc:derby:" + canonicalDbDirPath + "/db;create=true";
+
+      // No jdbc properties file will be used
+      jdbcPropertiesFile = null;
+
+      LOGGER.warn("Overriding values for - driver: " + driverClassName
+          + ", user: " + userName + "connectUrl: " + connectUrl
+          + ", jdbc properties file: " + jdbcPropertiesFile
+          + ", dbtype: " + dbTypeName);
+    }
+
+    // Right now only Derby and MySQL supported
+    databaseType = DatabaseType.getByName(dbTypeName);
+
+    switch (databaseType) {
+      case DERBY:
+      case MYSQL:
+        break;
+      default:
+        throw new JdbcChannelException("Database " + databaseType
+            + " not supported at this time");
+    }
+
+    // Register driver
+    if (driverClassName == null || driverClassName.trim().length() == 0) {
+      throw new JdbcChannelException("No jdbc driver specified");
+    }
+
+    try {
+      Class.forName(driverClassName);
+    } catch (ClassNotFoundException ex) {
+      throw new JdbcChannelException("Unable to load driver: "
+                  + driverClassName, ex);
+    }
+
+    // JDBC Properties
+    Properties jdbcProps = new Properties();
+
+    if (jdbcPropertiesFile != null && jdbcPropertiesFile.trim().length() > 0) {
+      File jdbcPropsFile = new File(jdbcPropertiesFile.trim());
+      if (!jdbcPropsFile.exists()) {
+        throw new JdbcChannelException("Jdbc properties file does not exist: "
+            + jdbcPropertiesFile);
+      }
+
+      InputStream inStream = null;
+      try {
+        inStream = new FileInputStream(jdbcPropsFile);
+        jdbcProps.load(inStream);
+      } catch (IOException ex) {
+        throw new JdbcChannelException("Unable to load jdbc properties "
+            + "from file: " + jdbcPropertiesFile, ex);
+      } finally {
+        if (inStream != null) {
+          try {
+            inStream.close();
+          } catch (IOException ex) {
+            LOGGER.error("Unable to close file: " + jdbcPropertiesFile, ex);
+          }
+        }
+      }
+    }
+
+    if (userName != null) {
+      Object oldUser = jdbcProps.put("user", userName);
+      if (oldUser != null) {
+        LOGGER.warn("Overriding user from: " + oldUser + " to: " + userName);
+      }
+    }
+
+    if (password != null) {
+      Object oldPass = jdbcProps.put("password", password);
+      if (oldPass != null) {
+        LOGGER.warn("Overriding password from the jdbc properties with "
+            + " the one specified explicitly.");
+      }
+    }
+
+    if (LOGGER.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder("JDBC Properties {");
+      boolean first = true;
+      Enumeration<?> propertyKeys = jdbcProps.propertyNames();
+      while (propertyKeys.hasMoreElements()) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        String key = (String) propertyKeys.nextElement();
+        sb.append(key).append("=");
+        if (key.equalsIgnoreCase("password")) {
+          sb.append("*******");
+        } else {
+          sb.append(jdbcProps.get(key));
+        }
+      }
+
+      sb.append("}");
+
+      LOGGER.debug(sb.toString());
+    }
+
+    // Transaction Isolation
+    String txIsolation = getConfigurationString(context,
+        ConfigurationConstants.CONFIG_TX_ISOLATION_LEVEL,
+        ConfigurationConstants.OLD_CONFIG_TX_ISOLATION_LEVEL,
+        TransactionIsolation.READ_COMMITTED.getName());
+
+    TransactionIsolation txIsolationLevel =
+        TransactionIsolation.getByName(txIsolation);
+
+    LOGGER.debug("Transaction isolation will be set to: " + txIsolationLevel);
+
+    // Setup Datasource
+    ConnectionFactory connFactory =
+        new DriverManagerConnectionFactory(connectUrl, jdbcProps);
+
+    connectionPool = new GenericObjectPool();
+
+    String maxActiveConnections = getConfigurationString(context,
+        ConfigurationConstants.CONFIG_MAX_CONNECTIONS,
+        ConfigurationConstants.OLD_CONFIG_MAX_CONNECTIONS, "10");
+
+    int maxActive = 10;
+    if (maxActiveConnections != null && maxActiveConnections.length() > 0) {
+      try {
+        maxActive = Integer.parseInt(maxActiveConnections);
+      } catch (NumberFormatException nfe) {
+        LOGGER.warn("Max active connections has invalid value: "
+                + maxActiveConnections + ", Using default: " + maxActive);
+      }
+    }
+
+    LOGGER.debug("Max active connections for the pool: " + maxActive);
+    connectionPool.setMaxActive(maxActive);
+
+    statementPool = new GenericKeyedObjectPoolFactory(null);
+
+    // Creating the factory instance automatically wires the connection pool
+    new PoolableConnectionFactory(connFactory, connectionPool, statementPool,
+        databaseType.getValidationQuery(), false, false,
+        txIsolationLevel.getCode());
+
+    dataSource = new PoolingDataSource(connectionPool);
+
+    txFactory = new JdbcTransactionFactory(dataSource, this);
+  }
+
+  /**
+   * A callback method invoked from individual transaction instances after
+   * a successful commit. The argument passed is the net number of events to
+   * be added to the current size as tracked by the provider.
+   * @param delta the net number of events to be added to reflect the current
+   *              size of the channel
+   */
+  protected void updateCurrentChannelSize(long delta) {
+    long currentSizeLong = currentSize.addAndGet(delta);
+    LOGGER.debug("channel size updated to: " + currentSizeLong);
+  }
+
+  /**
+   * Helper method to transition the configuration from the old long form
+   * style configuration to the new short form. If the value is specified for
+   * both the old and the new forms, the one associated with the new form
+   * takes precedence.
+   *
+   * @param context
+   * @param key the expected configuration key
+   * @param oldKey the deprecated configuration key
+   * @param defaultValue default value, null if no default
+   * @return the value associated with the key
+   */
+  private String getConfigurationString(Context context, String key,
+      String oldKey, String defaultValue) {
+
+    String oldValue = context.getString(oldKey);
+
+    if (oldValue != null && oldValue.length() > 0) {
+      LOGGER.warn("Long form configuration key \"" + oldKey
+          + "\" is deprecated. Please use the short form key \""
+          + key + "\" instead.");
+    }
+
+    String value = context.getString(key);
+
+    if (value == null) {
+      if (oldValue != null) {
+        value = oldValue;
+      } else {
+        value = defaultValue;
+      }
+    }
+
+    return value;
+  }
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java
new file mode 100644
index 0000000..59ebdb5
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import javax.sql.DataSource;
+
+public class JdbcTransactionFactory extends ThreadLocal<JdbcTransactionImpl> {
+
+  private final DataSource dataSource;
+  private final JdbcChannelProviderImpl providerImpl;
+
+  protected JdbcTransactionFactory(DataSource dataSource,
+      JdbcChannelProviderImpl providerImpl) {
+    super();
+    this.dataSource = dataSource;
+    this.providerImpl = providerImpl;
+  }
+
+  @Override
+  protected JdbcTransactionImpl initialValue() {
+    return new JdbcTransactionImpl(dataSource, this, providerImpl);
+  }
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
new file mode 100644
index 0000000..6f3aecd
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+
+import javax.sql.DataSource;
+
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.jdbc.JdbcChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcTransactionImpl implements Transaction {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(JdbcTransactionImpl.class);
+
+  private final DataSource dataSource;
+  private final JdbcChannelProviderImpl providerImpl;
+  private Connection connection;
+  private JdbcTransactionFactory txFactory;
+  private boolean active = true;
+
+  /** Reference count used to do the eventual commit.*/
+  private int count = 0;
+
+  /** Number of events successfully removed from the channel. */
+  private int removedEventCount = 0;
+
+  /** Number of events persisted to the channel. */
+  private int persistedEventCount = 0;
+
+  /** Flag that indicates if the transaction must be rolled back. */
+  private boolean rollback = false;
+
+  protected JdbcTransactionImpl(DataSource dataSource,
+      JdbcTransactionFactory factory, JdbcChannelProviderImpl provider) {
+    this.dataSource = dataSource;
+    txFactory = factory;
+    providerImpl = provider;
+  }
+
+  @Override
+  public void begin() {
+    if (!active) {
+      throw new JdbcChannelException("Inactive transaction");
+    }
+    if (count == 0) {
+      // Lease a connection now
+      try {
+        connection = dataSource.getConnection();
+      } catch (SQLException ex) {
+        throw new JdbcChannelException("Unable to lease connection", ex);
+      }
+      // Clear any prior warnings on the connection
+      try {
+        connection.clearWarnings();
+      } catch (SQLException ex) {
+        LOGGER.error("Error while clearing warnings: " + ex.getErrorCode(), ex);
+      }
+    }
+    count++;
+    LOGGER.trace("Tx count-begin: " + count + ", rollback: " + rollback);
+  }
+
+  @Override
+  public void commit() {
+    if (!active) {
+      throw new JdbcChannelException("Inactive transaction");
+    }
+    if (rollback) {
+      throw new JdbcChannelException(
+          "Cannot commit transaction marked for rollback");
+    }
+    LOGGER.trace("Tx count-commit: " + count + ", rollback: " + rollback);
+  }
+
+  @Override
+  public void rollback() {
+    if (!active) {
+      throw new JdbcChannelException("Inactive transaction");
+    }
+    LOGGER.warn("Marking transaction for rollback");
+    rollback = true;
+    LOGGER.trace("Tx count-rollback: " + count + ", rollback: " + rollback);
+  }
+
+  @Override
+  public void close() {
+    if (!active) {
+      throw new JdbcChannelException("Inactive transaction");
+    }
+    count--;
+    LOGGER.debug("Tx count-close: " + count + ", rollback: " + rollback);
+    if (count == 0) {
+      active = false;
+      try {
+        if (rollback) {
+          LOGGER.info("Attempting transaction roll-back");
+          connection.rollback();
+        } else {
+          LOGGER.debug("Attempting transaction commit");
+          connection.commit();
+
+          // Commit successful. Update provider channel size
+          providerImpl.updateCurrentChannelSize(this.persistedEventCount
+              - this.removedEventCount);
+
+          this.persistedEventCount = 0;
+          this.removedEventCount = 0;
+
+        }
+      } catch (SQLException ex) {
+        throw new JdbcChannelException("Unable to finalize transaction", ex);
+      } finally {
+        if (connection != null) {
+          // Log Warnings
+          try {
+            SQLWarning warning = connection.getWarnings();
+            if (warning != null) {
+              StringBuilder sb = new StringBuilder("Connection warnigns: ");
+              boolean first = true;
+              while (warning != null) {
+                if (first) {
+                  first = false;
+                } else {
+                  sb.append("; ");
+                }
+                sb.append("[").append(warning.getErrorCode()).append("] ");
+                sb.append(warning.getMessage());
+              }
+              LOGGER.warn(sb.toString());
+            }
+          } catch (SQLException ex) {
+            LOGGER.error("Error while retrieving warnigns: "
+                                + ex.getErrorCode(), ex);
+          }
+
+          // Close Connection
+          try {
+            connection.close();
+          } catch (SQLException ex) {
+            LOGGER.error(
+                "Unable to close connection: " + ex.getErrorCode(), ex);
+          }
+        }
+
+        // Clean up thread local
+        txFactory.remove();
+
+        // Destroy local state
+        connection = null;
+        txFactory = null;
+      }
+    }
+  }
+
+  protected Connection getConnection() {
+    if (!active) {
+      throw new JdbcChannelException("Inactive transaction");
+    }
+    return connection;
+  }
+
+  protected void incrementRemovedEventCount() {
+    removedEventCount++;
+  }
+
+  protected void incrementPersistedEventCount() {
+    persistedEventCount++;
+  }
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
new file mode 100644
index 0000000..f240948
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.channel.jdbc.impl;
+
+import java.sql.Connection;
+
+import javax.sql.DataSource;
+
+public class MySQLSchemaHandler implements SchemaHandler {
+
+  private final DataSource dataSource;
+
+  protected MySQLSchemaHandler(DataSource dataSource) {
+    this.dataSource = dataSource;
+  }
+
+  @Override
+  public boolean schemaExists() {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public void validateSchema() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void storeEvent(PersistableEvent pe, Connection connection) {
+    // TODO Auto-generated method stub
+  }
+
+  @Override
+  public PersistableEvent fetchAndDeleteEvent(String channel,
+      Connection connection) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public long getChannelSize(Connection connection) {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public void createSchemaObjects(boolean createForeignKeys, boolean createIndex) {
+    // TODO Auto-generated method stub
+
+  }
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
new file mode 100644
index 0000000..bf1163a
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Event;
+import org.apache.flume.channel.jdbc.ConfigurationConstants;
+import org.apache.flume.channel.jdbc.JdbcChannelException;
+
+public class PersistableEvent implements Event {
+
+  private long eventId;
+  private final String channel;
+  private byte[] basePayload;
+  private byte[] spillPayload;
+  private List<HeaderEntry> headers;
+
+  public PersistableEvent(String channel, Event event) {
+    this.channel = channel;
+
+    byte[] givenPayload = event.getBody();
+    if (givenPayload.length < ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD) {
+      basePayload = Arrays.copyOf(givenPayload, givenPayload.length);
+      spillPayload = null;
+    } else {
+      basePayload = Arrays.copyOfRange(givenPayload, 0,
+          ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD);
+      spillPayload = Arrays.copyOfRange(givenPayload,
+          ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD, givenPayload.length);
+    }
+
+    Map<String, String> headerMap = event.getHeaders();
+    if (headerMap != null && headerMap.size() > 0) {
+      headers = new ArrayList<HeaderEntry>();
+      for (Map.Entry<String, String> entry : headerMap.entrySet()) {
+        String name = entry.getKey();
+        String value = entry.getValue();
+        headers.add(new HeaderEntry(name, value));
+      }
+    }
+  }
+
+  private PersistableEvent(long eventId, String channel, byte[] basePayload,
+      byte[] spillPayload, List<HeaderEntry> headers) {
+    this.eventId = eventId;
+    this.channel = channel;
+    this.basePayload = basePayload;
+    this.spillPayload = spillPayload;
+    this.headers = headers;
+  }
+
+  public String getChannelName() {
+    return channel;
+  }
+
+  public byte[] getBasePayload() {
+    return this.basePayload;
+  }
+
+  public byte[] getSpillPayload() {
+    return this.spillPayload;
+  }
+
+  protected void setEventId(long eventId) {
+    this.eventId = eventId;
+  }
+
+  protected long getEventId() {
+    return this.eventId;
+  }
+
+  public List<HeaderEntry> getHeaderEntries() {
+    return headers;
+  }
+
+  protected static class HeaderEntry {
+
+    private long headerId = -1L;
+    private SpillableString name;
+    private SpillableString value;
+
+    public HeaderEntry(String name, String value) {
+      this.name = new SpillableString(name,
+          ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD);
+      this.value = new SpillableString(value,
+          ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD);
+    }
+
+    private HeaderEntry(long headerId, String baseName, String spillName,
+        String baseValue, String spillValue) {
+      this.headerId = headerId;
+      this.name = new SpillableString(baseName, spillName);
+      this.value = new SpillableString(baseValue, spillValue);
+    }
+
+    public String getNameString() {
+      return name.getString();
+    }
+
+    public SpillableString getName() {
+      return name;
+    }
+
+    public String getValueString() {
+      return value.getString();
+    }
+
+    public SpillableString getValue() {
+      return value;
+    }
+
+    protected void setId(long headerId) {
+      this.headerId = headerId;
+    }
+
+    public long getId() {
+      return headerId;
+    }
+
+  }
+
+  protected static class SpillableString {
+
+    private String base;
+    private String spill;
+
+    public SpillableString(String string, int threshold) {
+      if (string.getBytes().length < threshold) {
+        base = string;
+      } else {
+        // Identify the maximum character size that will fit in the
+        // given threshold
+        int currentIndex = threshold / 3; // Assuming 3 byte encoding worst case
+        int lastIndex = currentIndex;
+        while (true) {
+          int length = string.substring(0, currentIndex).getBytes().length;
+          if (length < threshold) {
+            lastIndex = currentIndex;
+            currentIndex++;
+          } else {
+            break;
+          }
+        }
+        base = string.substring(0, lastIndex);
+        spill = string.substring(lastIndex);
+      }
+    }
+
+    private SpillableString(String base, String spill) {
+      this.base = base;
+      this.spill = spill;
+    }
+
+    public String getBase() {
+      return base;
+    }
+
+    public String getSpill() {
+      return spill;
+    }
+
+    public String getString() {
+      if (spill == null) {
+        return base;
+      }
+      return base + spill;
+    }
+
+    public boolean hasSpill() {
+      return spill != null;
+    }
+  }
+
+  @Override
+  public void setHeaders(Map<String, String> headers) {
+    throw new UnsupportedOperationException("Cannot update headers of "
+        + "persistable event");
+  }
+
+  @Override
+  public byte[] getBody() {
+    byte[] result = null;
+    if (spillPayload == null) {
+      result = Arrays.copyOf(basePayload, basePayload.length);
+    } else {
+      result = new byte[basePayload.length + spillPayload.length];
+      System.arraycopy(basePayload, 0, result, 0, basePayload.length);
+      System.arraycopy(spillPayload, 0, result,
+          basePayload.length, spillPayload.length);
+    }
+
+    return result;
+  }
+
+  @Override
+  public void setBody(byte[] body) {
+    throw new UnsupportedOperationException("Cannot update payload of "
+        + "persistable event");
+  }
+
+  @Override
+  public Map<String, String> getHeaders() {
+    Map<String, String> headerMap = null;
+    if (headers != null) {
+      headerMap =  new HashMap<String, String>();
+      for (HeaderEntry entry :  headers) {
+        headerMap.put(entry.getNameString(), entry.getValueString());
+      }
+    }
+
+    return headerMap;
+  }
+
+  public static class Builder {
+
+    private long bEventId;
+    private String bChannelName;
+    private byte[] bBasePayload;
+    private byte[] bSpillPayload;
+    private Map<Long, HeaderPart> bHeaderParts;
+
+    public Builder(String channelName, long eventId) {
+      bChannelName = channelName;
+      bEventId = eventId;
+    }
+
+    public Builder setEventId(long eventId) {
+      bEventId = eventId;
+      return this;
+    }
+
+    public Builder setChannel(String channel) {
+      bChannelName = channel;
+      return this;
+    }
+
+    public Builder setBasePayload(byte[] basePayload) {
+      bBasePayload = basePayload;
+      return this;
+    }
+
+    public Builder setSpillPayload(byte[] spillPayload) {
+      bSpillPayload = spillPayload;
+      return this;
+    }
+
+    public Builder setHeader(long headerId, String baseName, String baseValue) {
+      if (bHeaderParts == null) {
+        bHeaderParts = new HashMap<Long, HeaderPart>();
+      }
+      HeaderPart hp = new HeaderPart(baseName, baseValue);
+      if (bHeaderParts.put(headerId, hp) != null) {
+        throw new JdbcChannelException("Duplicate header found: "
+            + "headerId: " + headerId + ", baseName: " + baseName + ", "
+            + "baseValue: " + baseValue);
+      }
+
+      return this;
+    }
+
+    public Builder setHeaderNameSpill(long headerId, String nameSpill) {
+      HeaderPart hp = bHeaderParts.get(headerId);
+      if (hp == null) {
+        throw new JdbcChannelException("Header not found for spill: "
+            + headerId);
+      }
+
+      hp.setSpillName(nameSpill);
+
+      return this;
+    }
+
+    public Builder setHeaderValueSpill(long headerId, String valueSpill) {
+      HeaderPart hp = bHeaderParts.get(headerId);
+      if (hp == null) {
+        throw new JdbcChannelException("Header not found for spill: "
+            + headerId);
+      }
+
+      hp.setSpillValue(valueSpill);
+
+      return this;
+    }
+
+    public PersistableEvent build() {
+      List<HeaderEntry> bHeaders = new ArrayList<HeaderEntry>();
+      if (bHeaderParts != null) {
+        for (long headerId : bHeaderParts.keySet()) {
+          HeaderPart part = bHeaderParts.get(headerId);
+          bHeaders.add(part.getEntry(headerId));
+        }
+      }
+
+      PersistableEvent pe = new PersistableEvent(bEventId, bChannelName,
+          bBasePayload, bSpillPayload, bHeaders);
+
+      bEventId = 0L;
+      bChannelName = null;
+      bBasePayload = null;
+      bSpillPayload = null;
+      bHeaderParts = null;
+
+      return pe;
+    }
+  }
+
+  @SuppressWarnings("unused")
+  private static class HeaderPart {
+    private final String hBaseName;
+    private final String hBaseValue;
+    private String hSpillName;
+    private String hSpillValue;
+
+    HeaderPart(String baseName, String baseValue) {
+      hBaseName = baseName;
+      hBaseValue = baseValue;
+    }
+
+    String getBaseName() {
+      return hBaseName;
+    }
+
+    String getBaseValue() {
+      return hBaseValue;
+    }
+
+    String getSpillName() {
+      return hSpillName;
+    }
+
+    String getSpillValue() {
+      return hSpillValue;
+    }
+
+    void setSpillName(String spillName) {
+      hSpillName = spillName;
+    }
+
+    void setSpillValue(String spillValue) {
+      hSpillValue = spillValue;
+    }
+
+    HeaderEntry getEntry(long headerId) {
+      return new HeaderEntry(headerId, hBaseName,
+          hSpillName, hBaseValue, hSpillValue);
+    }
+  }
+
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
new file mode 100644
index 0000000..9bfc227
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import java.sql.Connection;
+
+/**
+ * <p>A handler for creating and validating database schema for use by
+ * the JDBC channel implementation.</p>
+ */
+public interface SchemaHandler {
+
+  /**
+   * @return true if the schema exists. False otherwise.
+   */
+  public boolean schemaExists();
+
+  /**
+   * Validates the schema.
+   */
+  public void validateSchema();
+
+  /**
+   * Creates the schema.
+   * @param createForeignKeys a flag which indicates if the foreign key
+   *        constraints should be created where necessary.
+   * @param createIndex a flag which indicates if indexes must be created during
+   *        the creation of the schema.
+   */
+  public void createSchemaObjects(boolean createForeignKeys,
+      boolean createIndex);
+
+  /**
+   * Inserts the given persistent event into the database. The connection that
+   * is passed into the handler has an ongoing transaction and therefore the
+   * SchemaHandler implementation must not close the connection.
+   *
+   * @param pe the event to persist
+   * @param connection the connection to use
+   */
+  public void storeEvent(PersistableEvent pe, Connection connection);
+
+  /**
+   * Retrieves the next persistent event from the database. The connection that
+   * is passed into the handler has an ongoing transaction and therefore the
+   * SchemaHandler implementation must not close the connection.
+   *
+   * @param channel the channel name from which event will be retrieved
+   * @param connection the connection to use
+   * @return the next persistent event if available or null
+   */
+  public PersistableEvent fetchAndDeleteEvent(
+      String channel, Connection connection);
+
+  /**
+   * Returns the current size of the channel using the connection specified that
+   * must have an active transaction ongoing. This allows the provider impl to
+   * enforce channel capacity limits when persisting events.
+   * @return the current size of the channel.
+   */
+  public long getChannelSize(Connection connection);
+}
diff --git a/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
new file mode 100644
index 0000000..35f4c61
--- /dev/null
+++ b/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import javax.sql.DataSource;
+
+import org.apache.flume.channel.jdbc.DatabaseType;
+import org.apache.flume.channel.jdbc.JdbcChannelException;
+
+/**
+ * <p>A factory for SchemaHandlers.</p>
+ */
+public final class SchemaHandlerFactory {
+
+  public static SchemaHandler getHandler(DatabaseType dbType, DataSource dataSource) {
+    SchemaHandler handler = null;
+    switch (dbType) {
+      case DERBY:
+        handler = new DerbySchemaHandler(dataSource);
+        break;
+      case MYSQL:
+        handler = new MySQLSchemaHandler(dataSource);
+        break;
+      default:
+        throw new JdbcChannelException("Database " + dbType + " not supported yet");
+    }
+
+    return handler;
+  }
+
+  private SchemaHandlerFactory() {
+    // Disable explicit object creation
+  }
+}
diff --git a/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java
new file mode 100644
index 0000000..b208f47
--- /dev/null
+++ b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public abstract class BaseJdbcChannelProviderTest {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(BaseJdbcChannelProviderTest.class);
+
+  private Context derbyCtx = new Context();
+  private File derbyDbDir;
+  private JdbcChannelProviderImpl provider;
+
+
+  protected abstract void configureChannel(Context context);
+
+  @Before
+  public void setUp() throws IOException {
+    derbyCtx.clear();
+    derbyCtx.put(ConfigurationConstants.CONFIG_CREATE_SCHEMA, "true");
+    derbyCtx.put(ConfigurationConstants.CONFIG_DATABASE_TYPE, "DERBY");
+    derbyCtx.put(ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS,
+        "org.apache.derby.jdbc.EmbeddedDriver");
+
+    derbyCtx.put(ConfigurationConstants.CONFIG_PASSWORD, "");
+    derbyCtx.put(ConfigurationConstants.CONFIG_USERNAME, "sa");
+
+    File tmpDir = new File("target/test");
+    tmpDir.mkdirs();
+
+    File derbyLogFile = new File(tmpDir, "derbytest.log");
+    String derbyLogFilePath = derbyLogFile.getCanonicalPath();
+
+    derbyCtx.put(ConfigurationConstants.CONFIG_JDBC_SYSPROP_PREFIX
+        + "derby.stream.error.file", derbyLogFilePath);
+
+    // Use a temp file to create a temporary directory
+    File tempFile = File.createTempFile("temp", "_db", tmpDir);
+    String absFileName = tempFile.getCanonicalPath();
+    tempFile.delete();
+
+    derbyDbDir = new File(absFileName + "_dir");
+
+    if (!derbyDbDir.exists()) {
+      derbyDbDir.mkdirs();
+    }
+
+    derbyCtx.put(ConfigurationConstants.CONFIG_URL,
+        "jdbc:derby:memory:" + derbyDbDir.getCanonicalPath() + "/db;create=true");
+
+    configureChannel(derbyCtx);
+
+    LOGGER.info("Derby Properties: " + derbyCtx);
+  }
+
+  @Test
+  public void testDerbyChannelCapacity() {
+    provider = new JdbcChannelProviderImpl();
+
+    derbyCtx.put(ConfigurationConstants.CONFIG_MAX_CAPACITY, "10");
+
+    provider.initialize(derbyCtx);
+
+    Set<MockEvent> events = new HashSet<MockEvent>();
+    for (int i = 1; i < 12; i++) {
+      events.add(MockEventUtils.generateMockEvent(i, i, i, 61 % i, 1));
+    }
+
+    Iterator<MockEvent> meIt = events.iterator();
+    int count = 0;
+    while (meIt.hasNext()) {
+      count++;
+      MockEvent me = meIt.next();
+      String chName = me.getChannel();
+      try {
+        provider.persistEvent(chName, me);
+        if (count == 11) {
+          Assert.fail();
+        }
+      } catch (JdbcChannelException ex) {
+        // This is expected if the count is 10
+        Assert.assertEquals(11, count);
+      }
+
+      // Now should be able to remove one event and add this one
+      Event e = provider.removeEvent(chName);
+      Assert.assertNotNull(e);
+
+      // The current event should safely persist now
+      provider.persistEvent(chName, me);
+    }
+  }
+
+  @Test
+  public void testDerbySetup() {
+    provider = new JdbcChannelProviderImpl();
+
+    provider.initialize(derbyCtx);
+
+    Transaction tx1 = provider.getTransaction();
+    tx1.begin();
+
+    Transaction tx2 = provider.getTransaction();
+
+    Assert.assertSame(tx1, tx2);
+    tx2.begin();
+    tx2.close();
+    tx1.close();
+
+    Transaction tx3 = provider.getTransaction();
+    Assert.assertNotSame(tx1, tx3);
+
+    tx3.begin();
+    tx3.close();
+
+    provider.close();
+    provider = null;
+  }
+
+  /**
+   * Creates 120 events split over 10 channels, stores them via multiple
+   * simulated sources and consumes them via multiple simulated channels.
+   */
+  @Test
+  public void testEventWithSimulatedSourceAndSinks() throws Exception {
+    provider = new JdbcChannelProviderImpl();
+    provider.initialize(derbyCtx);
+
+    Map<String, List<MockEvent>> eventMap =
+        new HashMap<String, List<MockEvent>>();
+
+    for (int i = 1; i < 121; i++) {
+      MockEvent me = MockEventUtils.generateMockEvent(i, i, i, 61 % i, 10);
+      List<MockEvent> meList = eventMap.get(me.getChannel());
+      if (meList == null) {
+        meList = new ArrayList<MockEvent>();
+        eventMap.put(me.getChannel(), meList);
+      }
+      meList.add(me);
+    }
+
+    List<MockSource> sourceList = new ArrayList<MockSource>();
+    List<MockSink> sinkList = new ArrayList<MockSink>();
+
+    for (String channel : eventMap.keySet()) {
+      List<MockEvent> meList = eventMap.get(channel);
+      sourceList.add(new MockSource(channel, meList, provider));
+      sinkList.add(new MockSink(channel, meList, provider));
+    }
+
+    ExecutorService sourceExecutor = Executors.newFixedThreadPool(10);
+    ExecutorService sinkExecutor = Executors.newFixedThreadPool(10);
+
+    List<Future<Integer>> srcResults = sourceExecutor.invokeAll(sourceList,
+        300, TimeUnit.SECONDS);
+    Thread.sleep(MockEventUtils.generateSleepInterval(3000));
+    List<Future<Integer>> sinkResults = sinkExecutor.invokeAll(sinkList,
+        300, TimeUnit.SECONDS);
+
+    int srcCount = 0;
+    for (Future<Integer> srcOutput : srcResults) {
+      srcCount += srcOutput.get();
+    }
+
+    Assert.assertEquals(120, srcCount);
+
+    int sinkCount = 0;
+    for (Future<Integer> sinkOutput : sinkResults) {
+      sinkCount += sinkOutput.get();
+    }
+
+    Assert.assertEquals(120, sinkCount);
+
+  }
+
+
+  /**
+   * creates 80 events split over 5 channels, stores them
+   */
+  @Test
+  public void testPeristingEvents() {
+    provider = new JdbcChannelProviderImpl();
+    provider.initialize(derbyCtx);
+
+    Map<String, List<MockEvent>> eventMap =
+        new HashMap<String, List<MockEvent>>();
+
+    Set<MockEvent> events = new HashSet<MockEvent>();
+    for (int i = 1; i < 81; i++) {
+      events.add(MockEventUtils.generateMockEvent(i, i, i, 61 % i, 5));
+    }
+
+    Iterator<MockEvent> meIt = events.iterator();
+    while (meIt.hasNext()) {
+      MockEvent me = meIt.next();
+      String chName = me.getChannel();
+      List<MockEvent> eventList = eventMap.get(chName);
+      if (eventList == null) {
+        eventList = new ArrayList<MockEvent>();
+        eventMap.put(chName, eventList);
+      }
+      eventList.add(me);
+      provider.persistEvent(me.getChannel(), me);
+    }
+
+    // Now retrieve the events and they should be in the persistence order
+
+    for (String chName : eventMap.keySet()) {
+      List<MockEvent> meList = eventMap.get(chName);
+      Iterator<MockEvent> it = meList.iterator();
+      while (it.hasNext()) {
+        MockEvent me = it.next();
+        Event event = provider.removeEvent(chName);
+        assertEquals(me, event);
+      }
+
+      // Now the there should be no more events for this channel
+      Event nullEvent = provider.removeEvent(chName);
+      Assert.assertNull(nullEvent);
+    }
+
+    provider.close();
+    provider = null;
+  }
+
+  private static void assertEquals(Event e1, Event e2) {
+    byte[] pl1 = e1.getBody();
+    byte[] pl2 = e2.getBody();
+
+    Assert.assertArrayEquals(pl1, pl2);
+    Map<String, String> h1 = e1.getHeaders();
+    Map<String, String> h2 = e2.getHeaders();
+    if (h1 == null || h1.size() == 0) {
+      Assert.assertTrue(h2 == null || h2.size() == 0);
+    } else {
+      Assert.assertTrue(h1.size() == h2.size());
+      for (String key : h1.keySet()) {
+        Assert.assertTrue(h2.containsKey(key));
+        String v1 = h1.get(key);
+        String v2 = h2.remove(key);
+        Assert.assertEquals(v1, v2);
+      }
+      Assert.assertTrue(h2.size() == 0);
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (provider != null) {
+      try {
+        provider.close();
+      } catch (Exception ex) {
+        LOGGER.error("Unable to close provider", ex);
+      }
+    }
+    provider = null;
+  }
+
+  private static class MockSink implements Callable<Integer> {
+
+    private final String channel;
+    private final List<MockEvent> events;
+    private final JdbcChannelProviderImpl provider;
+
+    private MockSink(String channel, List<MockEvent> events,
+        JdbcChannelProviderImpl provider) {
+      this.channel = channel;
+      this.events = events;
+      this.provider = provider;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+      LOGGER.debug("Sink for channel[" + channel + "]: starting");
+      if (events == null) {
+        return 0;
+      }
+      Iterator<MockEvent> it = events.iterator();
+      while (it.hasNext()) {
+        MockEvent me = it.next();
+        Event event = null;
+        while (event == null) {
+          event = provider.removeEvent(channel);
+          if (event == null) {
+            LOGGER.debug("Sink for channel[" + channel + "]: empty queue");
+            try {
+              Thread.sleep(MockEventUtils.generateSleepInterval(1000));
+            } catch (InterruptedException ex) {
+              Thread.currentThread().interrupt();
+            }
+          } else {
+            LOGGER.debug("Sink for channel[" + channel + "]: removed event: "
+                    + event);
+          }
+        }
+        BaseJdbcChannelProviderTest.assertEquals(me, event);
+      }
+
+      LOGGER.debug("Sink for channel[" + channel + "]: retrieved all events");
+
+      return events.size();
+    }
+  }
+
+  private static class MockSource implements Callable<Integer> {
+
+    private final String channel;
+    private final List<MockEvent> events;
+    private final JdbcChannelProviderImpl provider;
+
+    private MockSource(String channel, List<MockEvent> events,
+        JdbcChannelProviderImpl provider) {
+      this.channel = channel;
+      this.events = events;
+      this.provider = provider;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+      LOGGER.debug("Source for channel[" + channel + "]: starting");
+      if (events == null) {
+        return 0;
+      }
+      Iterator<MockEvent> it = events.iterator();
+      while (it.hasNext()) {
+        MockEvent me = it.next();
+        Assert.assertEquals(channel, me.getChannel());
+        provider.persistEvent(channel, me);
+        try {
+          Thread.sleep(MockEventUtils.generateSleepInterval(1000));
+        } catch (InterruptedException ex) {
+          Thread.currentThread().interrupt();
+        }
+      }
+
+      LOGGER.debug("Source for channel[" + channel + "]: submitted all events");
+
+      return events.size();
+    }
+
+  }
+
+}
diff --git a/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
new file mode 100644
index 0000000..6804a9f
--- /dev/null
+++ b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.Map;
+
+import org.apache.flume.Event;
+
+public class MockEvent implements Event {
+
+  private final byte[] payload;
+  private final Map<String, String> headers;
+  private final String channel;
+
+  public MockEvent(byte[] payload, Map<String, String> headers, String channel) {
+    this.payload = payload;
+    this.headers = headers;
+    this.channel = channel;
+  }
+
+  @Override
+  public Map<String, String> getHeaders() {
+    return headers;
+  }
+
+  @Override
+  public void setHeaders(Map<String, String> headers) {
+
+  }
+
+  @Override
+  public byte[] getBody() {
+    return payload;
+  }
+
+  @Override
+  public void setBody(byte[] body) {
+
+  }
+
+  public String getChannel() {
+    return channel;
+  }
+
+}
diff --git a/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
new file mode 100644
index 0000000..e5ee324
--- /dev/null
+++ b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public final class MockEventUtils {
+
+  public static final Logger LOGGER =
+      LoggerFactory.getLogger(MockEventUtils.class);
+
+  private static final Random RANDOM = new Random(System.currentTimeMillis());
+
+  private static final String[] CHARS = new String[] {
+    "a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r",
+    "s","t","u","v","w","x","y","z",
+    "A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R",
+    "S","T","U","V","W","X","Y","Z",
+    "0","1","2","3","4","5","6","7","8","9",
+    "!","@","#","$","%","^","&","*","(",")",
+    "[","]","{","}",":",";","\"","'",",",".","<",">","?","/","\\","|",
+  };
+
+  public static byte[] generatePayload(int size) {
+    byte[] result = new byte[size];
+    RANDOM.nextBytes(result);
+    return result;
+  }
+
+  public static String generateHeaderString(int size) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < size; i++) {
+      int x = Math.abs(RANDOM.nextInt());
+      int y = x % CHARS.length;
+      sb.append(CHARS[y]);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Generates a mock event using the specified margins that are offset from
+   * the threshold values of the various sizes. Also the number of headers is
+   * specified along with number of channels. The last parameter - numChannels
+   * is used to calculate a channel name that will be used to tag the event
+   * with.
+   * @param payloadMargin
+   * @param headerNameMargin
+   * @param headerValueMargin
+   * @param numHeaders
+   * @param numChannels
+   * @return
+   */
+  public static MockEvent generateMockEvent(int payloadMargin, int headerNameMargin,
+                                            int headerValueMargin, int numHeaders,
+                                            int numChannels) {
+
+    int chIndex = 0;
+    if (numChannels > 1) {
+      chIndex = Math.abs(RANDOM.nextInt()) % numChannels;
+    }
+    String channel = "test-" + chIndex;
+
+    StringBuilder sb = new StringBuilder("New Event[payload size:");
+
+    int plTh = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
+    int plSize = Math.abs(RANDOM.nextInt()) % plTh + payloadMargin;
+    sb.append(plSize).append(", numHeaders:").append(numHeaders);
+    sb.append(", channel:").append(channel);
+
+    byte[] payload = generatePayload(plSize);
+    int nmTh = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
+    int vlTh = ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD;
+
+    Map<String, String> headers = new HashMap<String, String>();
+    for (int i = 0; i < numHeaders; i++) {
+      int nmSize = Math.abs(RANDOM.nextInt()) % nmTh + headerNameMargin;
+      int vlSize = Math.abs(RANDOM.nextInt()) % vlTh + headerValueMargin;
+
+      String name = generateHeaderString(nmSize);
+      String value = generateHeaderString(vlSize);
+
+      headers.put(name, value);
+      sb.append("{nm:").append(nmSize).append(",vl:").append(vlSize);
+      sb.append("} ");
+    }
+
+    LOGGER.debug(sb.toString());
+
+    return new MockEvent(payload, headers, channel);
+  }
+
+  public static int generateSleepInterval(int upperBound) {
+    return Math.abs(RANDOM.nextInt(upperBound));
+  }
+
+  private MockEventUtils() {
+    // Disable explicit object creation
+  }
+}
diff --git a/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDatabaseTypeEnum.java b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDatabaseTypeEnum.java
new file mode 100644
index 0000000..39f488c
--- /dev/null
+++ b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDatabaseTypeEnum.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * The purpose of this test is to guard against accidental backward
+ * compatibility problem since the string representation so of DatabaseType enum
+ * are a public interface used in configuration.
+ */
+public class TestDatabaseTypeEnum {
+
+  public static final String DBTYPE_OTHER = "OTHER";
+  public static final String DBTYPE_DERBY = "DERBY";
+  public static final String DBTYPE_MYSQL = "MYSQL";
+  public static final String DBTYPE_PGSQL = "POSTGRESQL";
+  public static final String DBTYPE_ORACLE = "ORACLE";
+
+  private Map<String, DatabaseType> enumMap =
+      new HashMap<String, DatabaseType>();
+
+  @Before
+  public void setUp() {
+    enumMap.clear();
+    enumMap.put(DBTYPE_OTHER, DatabaseType.OTHER);
+    enumMap.put(DBTYPE_DERBY, DatabaseType.DERBY);
+    enumMap.put(DBTYPE_MYSQL, DatabaseType.MYSQL);
+    enumMap.put(DBTYPE_PGSQL, DatabaseType.POSTGRESQL);
+    enumMap.put(DBTYPE_ORACLE, DatabaseType.ORACLE);
+  }
+
+  @Test
+  public void testDatabaseTypeLookup() {
+    for (String key : enumMap.keySet()) {
+      DatabaseType type = enumMap.get(key);
+      DatabaseType lookupType = DatabaseType.valueOf(key);
+      String lookupTypeName = lookupType.getName();
+
+      Assert.assertEquals(lookupTypeName, lookupType.toString());
+      Assert.assertSame(type, lookupType);
+      Assert.assertEquals(key, lookupTypeName);
+
+      DatabaseType lookupType2 = DatabaseType.getByName(key.toLowerCase(Locale.ENGLISH));
+      Assert.assertSame(type, lookupType2);
+    }
+  }
+
+  @Test
+  public void testUnknonwnDatabaseTypeLookup() {
+    String[] invalidTypes = new String[] { "foo", "bar", "abcd" };
+
+    for (String key : invalidTypes) {
+      DatabaseType type = DatabaseType.getByName(key);
+
+      Assert.assertSame(type, DatabaseType.OTHER);
+    }
+  }
+}
diff --git a/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
new file mode 100644
index 0000000..cad972c
--- /dev/null
+++ b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import org.apache.flume.channel.jdbc.impl.DerbySchemaHandler;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDerbySchemaHandlerQueries {
+
+  public static final String EXPECTED_QUERY_CREATE_SCHEMA_FLUME
+       = "CREATE SCHEMA FLUME";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_EVENT
+       = "CREATE TABLE FLUME.FL_EVENT ( FLE_ID BIGINT GENERATED ALWAYS AS "
+           + "IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, FLE_PAYLOAD "
+           + "VARCHAR(16384) FOR BIT DATA, FLE_CHANNEL VARCHAR(64), "
+           + "FLE_SPILL BOOLEAN)";
+
+  public static final String EXPECTED_QUERY_CREATE_INDEX_FLE_CHANNEL
+       = "CREATE INDEX FLUME.IDX_FLE_CHANNEL ON FLUME.FL_EVENT (FLE_CHANNEL)";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL_FK
+       = "CREATE TABLE FLUME.FL_PLSPILL ( FLP_EVENT BIGINT, FLP_SPILL BLOB, "
+           + "FOREIGN KEY (FLP_EVENT) REFERENCES FLUME.FL_EVENT (FLE_ID))";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL_NOFK
+      = "CREATE TABLE FLUME.FL_PLSPILL ( FLP_EVENT BIGINT, FLP_SPILL BLOB)";
+
+  public static final String EXPECTED_QUERY_CREATE_INDEX_FLP_EVENT
+       = "CREATE INDEX FLUME.IDX_FLP_EVENT ON FLUME.FL_PLSPILL (FLP_EVENT)";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_HEADER_FK
+       = "CREATE TABLE FLUME.FL_HEADER ( FLH_ID BIGINT GENERATED ALWAYS AS "
+           + "IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+           + "FLH_EVENT BIGINT, FLH_NAME VARCHAR(251), "
+           + "FLH_VALUE VARCHAR(251), FLH_NMSPILL BOOLEAN, "
+           + "FLH_VLSPILL BOOLEAN, FOREIGN KEY (FLH_EVENT) "
+           + "REFERENCES FLUME.FL_EVENT (FLE_ID))";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_HEADER_NOFK
+      = "CREATE TABLE FLUME.FL_HEADER ( FLH_ID BIGINT GENERATED ALWAYS AS "
+           + "IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+           + "FLH_EVENT BIGINT, FLH_NAME VARCHAR(251), "
+           + "FLH_VALUE VARCHAR(251), FLH_NMSPILL BOOLEAN, "
+           + "FLH_VLSPILL BOOLEAN)";
+
+  public static final String EXPECTED_QUERY_CREATE_INDEX_FLH_EVENT
+       = "CREATE INDEX FLUME.IDX_FLH_EVENT ON FLUME.FL_HEADER (FLH_EVENT)";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL_FK
+       = "CREATE TABLE FLUME.FL_NMSPILL ( FLN_HEADER BIGINT, FLN_SPILL "
+           + "VARCHAR(32517), FOREIGN KEY (FLN_HEADER) REFERENCES "
+           + "FLUME.FL_HEADER (FLH_ID))";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL_NOFK
+      = "CREATE TABLE FLUME.FL_NMSPILL ( FLN_HEADER BIGINT, FLN_SPILL "
+           + "VARCHAR(32517))";
+
+  public static final String EXPECTED_QUERY_CREATE_INDEX_FLN_HEADER
+       = "CREATE INDEX FLUME.IDX_FLN_HEADER ON FLUME.FL_NMSPILL (FLN_HEADER)";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL_FK
+       = "CREATE TABLE FLUME.FL_VLSPILL ( FLV_HEADER BIGINT, FLV_SPILL "
+            + "VARCHAR(32517), FOREIGN KEY (FLV_HEADER) REFERENCES "
+            + "FLUME.FL_HEADER (FLH_ID))";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL_NOFK
+      = "CREATE TABLE FLUME.FL_VLSPILL ( FLV_HEADER BIGINT, FLV_SPILL "
+            + "VARCHAR(32517))";
+
+  public static final String EXPECTED_QUERY_CREATE_INDEX_FLV_HEADER
+       = "CREATE INDEX FLUME.IDX_FLV_HEADER ON FLUME.FL_VLSPILL (FLV_HEADER)";
+
+  public static final String EXPECTED_COLUMN_LOOKUP_QUERY
+      = "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
+          + "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
+          + "SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE "
+          + "SCHEMANAME = ? ))";
+
+  public static final String EXPECTED_QUERY_CHANNEL_SIZE
+      = "SELECT COUNT(*) FROM FLUME.FL_EVENT";
+
+  public static final String EXPECTED_STMT_INSERT_EVENT_BASE
+      = "INSERT INTO FLUME.FL_EVENT (FLE_PAYLOAD, FLE_CHANNEL, FLE_SPILL) "
+          + "VALUES ( ?, ?, ?)";
+
+  public static final String EXPECTED_STMT_INSERT_EVENT_SPILL
+      = "INSERT INTO FLUME.FL_PLSPILL (FLP_EVENT, FLP_SPILL) VALUES ( ?, ?)";
+
+  public static final String EXPECTED_STMT_INSERT_HEADER_BASE
+      = "INSERT INTO FLUME.FL_HEADER (FLH_EVENT, FLH_NAME, FLH_VALUE, "
+          + "FLH_NMSPILL, FLH_VLSPILL) VALUES ( ?, ?, ?, ?, ?)";
+
+  public static final String EXPECTED_STMT_INSERT_HEADER_NAME_SPILL
+      = "INSERT INTO FLUME.FL_NMSPILL (FLN_HEADER, FLN_SPILL) VALUES ( ?, ?)";
+
+  public static final String EXPECTED_STMT_INSERT_HEADER_VALUE_SPILL
+      = "INSERT INTO FLUME.FL_VLSPILL (FLV_HEADER, FLV_SPILL) VALUES ( ?, ?)";
+
+  public static final String EXPECTED_STMT_FETCH_PAYLOAD_BASE
+      = "SELECT FLE_ID, FLE_PAYLOAD, FLE_SPILL FROM FLUME.FL_EVENT WHERE "
+          + "FLE_ID = (SELECT MIN(FLE_ID) FROM FLUME.FL_EVENT WHERE "
+          + "FLE_CHANNEL = ?)";
+
+  public static final String EXPECTED_STMT_FETCH_PAYLOAD_SPILL
+      = "SELECT FLP_SPILL FROM FLUME.FL_PLSPILL WHERE FLP_EVENT = ?";
+
+  public static final String EXPECTED_STMT_FETCH_HEADER_BASE
+      = "SELECT FLH_ID, FLH_NAME, FLH_VALUE, FLH_NMSPILL, FLH_VLSPILL FROM "
+          + "FLUME.FL_HEADER WHERE FLH_EVENT = ?";
+
+  public static final String EXPECTED_STMT_FETCH_HEADER_NAME_SPILL
+      = "SELECT FLN_SPILL FROM FLUME.FL_NMSPILL WHERE FLN_HEADER = ?";
+
+  public static final String EXPECTED_STMT_FETCH_HEADER_VALUE_SPILL
+      = "SELECT FLV_SPILL FROM FLUME.FL_VLSPILL WHERE FLV_HEADER = ?";
+
+  public static final String EXPECTED_STMT_DELETE_HEADER_VALUE_SPILL
+      = "DELETE FROM FLUME.FL_VLSPILL WHERE FLV_HEADER = ?";
+
+  public static final String EXPECTED_STMT_DELETE_HEADER_NAME_SPILL
+      = "DELETE FROM FLUME.FL_NMSPILL WHERE FLN_HEADER = ?";
+
+  public static final String EXPECTED_STMT_DELETE_EVENT_SPILL
+      = "DELETE FROM FLUME.FL_PLSPILL WHERE FLP_EVENT = ?";
+
+  public static final String EXPECTED_STMT_DELETE_HEADER_BASE
+      = "DELETE FROM FLUME.FL_HEADER WHERE FLH_EVENT = ?";
+
+  public static final String EXPECTED_STMT_DELETE_EVENT_BASE
+      = "DELETE FROM FLUME.FL_EVENT WHERE FLE_ID = ?";
+
+
+  @Test
+  public void testCreateQueries() {
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_SCHEMA_FLUME,
+        EXPECTED_QUERY_CREATE_SCHEMA_FLUME);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_EVENT,
+        EXPECTED_QUERY_CREATE_TABLE_FL_EVENT);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLE_CHANNEL,
+        EXPECTED_QUERY_CREATE_INDEX_FLE_CHANNEL);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_PLSPILL_FK,
+        EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL_FK);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_PLSPILL_NOFK,
+        EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL_NOFK);
+
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLP_EVENT,
+        EXPECTED_QUERY_CREATE_INDEX_FLP_EVENT);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_HEADER_FK,
+        EXPECTED_QUERY_CREATE_TABLE_FL_HEADER_FK);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_HEADER_NOFK,
+        EXPECTED_QUERY_CREATE_TABLE_FL_HEADER_NOFK);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLH_EVENT,
+        EXPECTED_QUERY_CREATE_INDEX_FLH_EVENT);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_NMSPILL_FK,
+        EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL_FK);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_NMSPILL_NOFK,
+        EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL_NOFK);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLN_HEADER,
+        EXPECTED_QUERY_CREATE_INDEX_FLN_HEADER);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_VLSPILL_FK,
+        EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL_FK);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_VLSPILL_NOFK,
+        EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL_NOFK);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLV_HEADER,
+        EXPECTED_QUERY_CREATE_INDEX_FLV_HEADER);
+
+    Assert.assertEquals(DerbySchemaHandler.COLUMN_LOOKUP_QUERY,
+        EXPECTED_COLUMN_LOOKUP_QUERY);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CHANNEL_SIZE,
+        EXPECTED_QUERY_CHANNEL_SIZE);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_EVENT_BASE,
+        EXPECTED_STMT_INSERT_EVENT_BASE);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_EVENT_SPILL,
+        EXPECTED_STMT_INSERT_EVENT_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_HEADER_BASE,
+        EXPECTED_STMT_INSERT_HEADER_BASE);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_HEADER_NAME_SPILL,
+        EXPECTED_STMT_INSERT_HEADER_NAME_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_HEADER_VALUE_SPILL,
+        EXPECTED_STMT_INSERT_HEADER_VALUE_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_PAYLOAD_BASE,
+        EXPECTED_STMT_FETCH_PAYLOAD_BASE);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_PAYLOAD_SPILL,
+        EXPECTED_STMT_FETCH_PAYLOAD_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_HEADER_BASE,
+        EXPECTED_STMT_FETCH_HEADER_BASE);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_HEADER_NAME_SPILL,
+        EXPECTED_STMT_FETCH_HEADER_NAME_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_FETCH_HEADER_VALUE_SPILL,
+        EXPECTED_STMT_FETCH_HEADER_VALUE_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_HEADER_VALUE_SPILL,
+        EXPECTED_STMT_DELETE_HEADER_VALUE_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_HEADER_NAME_SPILL,
+        EXPECTED_STMT_DELETE_HEADER_NAME_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_EVENT_SPILL,
+        EXPECTED_STMT_DELETE_EVENT_SPILL);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_HEADER_BASE,
+        EXPECTED_STMT_DELETE_HEADER_BASE);
+
+    Assert.assertEquals(DerbySchemaHandler.STMT_DELETE_EVENT_BASE,
+        EXPECTED_STMT_DELETE_EVENT_BASE);
+
+  }
+
+}
diff --git a/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
new file mode 100644
index 0000000..7a26b5e
--- /dev/null
+++ b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import org.apache.flume.Context;
+
+public class TestJdbcChannelProvider extends BaseJdbcChannelProviderTest {
+
+  @Override
+  protected void configureChannel(Context context) {
+    // using default configuration
+  }
+}
diff --git a/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProviderNoFK.java b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProviderNoFK.java
new file mode 100644
index 0000000..b64fa0e
--- /dev/null
+++ b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProviderNoFK.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import org.apache.flume.Context;
+
+public class TestJdbcChannelProviderNoFK extends BaseJdbcChannelProviderTest {
+
+  @Override
+  protected void configureChannel(Context context) {
+    context.put(ConfigurationConstants.CONFIG_CREATE_FK, "false");
+  }
+
+}
diff --git a/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
new file mode 100644
index 0000000..3ae5227
--- /dev/null
+++ b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.channel.jdbc.impl.PersistableEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestPersistentEvent {
+
+  @Test
+  public void testMarshalling() {
+
+    int nameLimit = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
+    int valLimit = ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD;
+
+    byte[] s1 = MockEventUtils.generatePayload(1);
+    runTest(s1, null);
+
+    byte[] s2 = MockEventUtils.generatePayload(2);
+    runTest(s2, new HashMap<String, String>());
+
+    int th = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
+
+    byte[] s3 = MockEventUtils.generatePayload(th - 2);
+    Map<String, String> m3 = new HashMap<String, String>();
+    m3.put(MockEventUtils.generateHeaderString(1),
+        MockEventUtils.generateHeaderString(1));
+    runTest(s3, m3);
+
+    byte[] s4 = MockEventUtils.generatePayload(th - 1);
+    Map<String, String> m4 = new HashMap<String, String>();
+    m4.put(MockEventUtils.generateHeaderString(nameLimit - 21), "w");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit - 2), "x");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit - 1), "y");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit), "z");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit + 1), "a");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit + 2), "b");
+    m4.put(MockEventUtils.generateHeaderString(nameLimit + 21), "c");
+    runTest(s4, m4);
+
+    byte[] s5 = MockEventUtils.generatePayload(th);
+    Map<String, String> m5 = new HashMap<String, String>();
+    m5.put("w", MockEventUtils.generateHeaderString(valLimit - 21));
+    m5.put("x", MockEventUtils.generateHeaderString(valLimit - 2));
+    m5.put("y", MockEventUtils.generateHeaderString(valLimit - 1));
+    m5.put("z", MockEventUtils.generateHeaderString(valLimit));
+    m5.put("a", MockEventUtils.generateHeaderString(valLimit + 1));
+    m5.put("b", MockEventUtils.generateHeaderString(valLimit + 2));
+    m5.put("c", MockEventUtils.generateHeaderString(valLimit + 21));
+    runTest(s5, m5);
+
+    byte[] s6 = MockEventUtils.generatePayload(th + 1);
+    Map<String, String> m6 = new HashMap<String, String>();
+    m6.put(MockEventUtils.generateHeaderString(nameLimit - 21),
+           MockEventUtils.generateHeaderString(valLimit - 21));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit - 2),
+           MockEventUtils.generateHeaderString(valLimit - 2));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit - 1),
+           MockEventUtils.generateHeaderString(valLimit - 1));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit),
+           MockEventUtils.generateHeaderString(valLimit));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit + 1),
+           MockEventUtils.generateHeaderString(valLimit + 1));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit + 2),
+           MockEventUtils.generateHeaderString(valLimit + 2));
+    m6.put(MockEventUtils.generateHeaderString(nameLimit + 21),
+           MockEventUtils.generateHeaderString(valLimit + 21));
+
+    runTest(s6, m6);
+
+    byte[] s7 = MockEventUtils.generatePayload(th + 2);
+    runTest(s7, null);
+
+    byte[] s8 = MockEventUtils.generatePayload(th + 27);
+    runTest(s8, null);
+  }
+
+
+  private void runTest(byte[] payload, Map<String, String> headers) {
+    PersistableEvent pe = new PersistableEvent("test",
+        new MockEvent(payload, headers, null));
+    Assert.assertArrayEquals(payload, pe.getBody());
+    Map<String, String> h = pe.getHeaders();
+    if (h == null) {
+      Assert.assertTrue(headers == null || headers.size() == 0);
+    } else {
+      Assert.assertTrue(headers.size() == h.size());
+      for (String key : h.keySet()) {
+        Assert.assertTrue(headers.containsKey(key));
+        String value = h.get(key);
+        String expectedValue = headers.remove(key);
+        Assert.assertEquals(expectedValue, value);
+      }
+      Assert.assertTrue(headers.size() == 0);
+    }
+  }
+}
diff --git a/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestTransactionIsolationLevelEnum.java b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestTransactionIsolationLevelEnum.java
new file mode 100644
index 0000000..f96b6d7
--- /dev/null
+++ b/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestTransactionIsolationLevelEnum.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * The purpose of this test is to guard against accidental backward
+ * compatibility problem since the string representation so of
+ * TransactionIsolation enum are a public interface used in configuration.
+ */
+public class TestTransactionIsolationLevelEnum {
+
+  public static final String TX_READ_UNCOMMITTED = "READ_UNCOMMITTED";
+  public static final String TX_READ_COMMITTED = "READ_COMMITTED";
+  public static final String TX_REPEATABLE_READ = "REPEATABLE_READ";
+  public static final String TX_SERIALIZABLE = "SERIALIZABLE";
+
+  private Map<String, TransactionIsolation> enumMap =
+      new HashMap<String, TransactionIsolation>();
+
+  @Before
+  public void setUp() {
+    enumMap.clear();
+    enumMap.put(TX_READ_UNCOMMITTED, TransactionIsolation.READ_UNCOMMITTED);
+    enumMap.put(TX_READ_COMMITTED, TransactionIsolation.READ_COMMITTED);
+    enumMap.put(TX_REPEATABLE_READ, TransactionIsolation.REPEATABLE_READ);
+    enumMap.put(TX_SERIALIZABLE, TransactionIsolation.SERIALIZABLE);
+  }
+
+  @Test
+  public void testReverseLookup() {
+    for (String key : enumMap.keySet()) {
+      TransactionIsolation txIsolation = enumMap.get(key);
+      TransactionIsolation lookupTxIsolation =
+          TransactionIsolation.valueOf(key);
+      String lookupTxIsolationName = lookupTxIsolation.getName();
+
+      Assert.assertEquals(lookupTxIsolationName, lookupTxIsolation.toString());
+      Assert.assertSame(txIsolation, lookupTxIsolation);
+      Assert.assertEquals(key, lookupTxIsolationName);
+
+      TransactionIsolation lookupTxIsolation2 =
+          TransactionIsolation.getByName(key.toLowerCase(Locale.ENGLISH));
+      Assert.assertSame(txIsolation, lookupTxIsolation2);
+    }
+  }
+}
diff --git a/flume-jdbc-dist/pom.xml b/flume-jdbc-dist/pom.xml
new file mode 100644
index 0000000..0fd0bff
--- /dev/null
+++ b/flume-jdbc-dist/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.flume</groupId>
+    <artifactId>flume-jdbc-parent</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>flume-jdbc-dist</artifactId>
+  <name>Flume JDBC Distribution</name>
+  <packaging>pom</packaging>
+
+  <properties>
+    <maven.deploy.skip>true</maven.deploy.skip>
+    <maven.install.skip>true</maven.install.skip>
+    <maven.test.skip>true</maven.test.skip>
+    <spotless.check.skip>true</spotless.check.skip>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-jdbc-channel</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <!-- calculate checksums of source release for Apache dist area -->
+      <plugin>
+        <groupId>net.nicoulaj.maven.plugins</groupId>
+        <artifactId>checksum-maven-plugin</artifactId>
+        <version>${checksum-maven-plugin.version}</version>
+        <executions>
+          <execution>
+            <id>calculate-checksums</id>
+            <goals>
+              <goal>files</goal>
+            </goals>
+            <!-- execute prior to maven-gpg-plugin:sign due to https://github.com/nicoulaj/checksum-maven-plugin/issues/112 -->
+            <phase>post-integration-test</phase>
+            <configuration>
+              <algorithms>
+                <algorithm>SHA-512</algorithm>
+              </algorithms>
+              <!-- https://maven.apache.org/apache-resource-bundles/#source-release-assembly-descriptor -->
+              <fileSets>
+                <fileSet>
+                  <directory>${project.build.directory}</directory>
+                  <includes>
+                    <include>apache-flume-jdbc-${project.version}-src.zip</include>
+                    <include>apache-flume-jdbc-${project.version}-src.tar.gz</include>
+                    <include>apache-flume-jdbc-${project.version}-bin.zip</include>
+                    <include>apache-flume-jdbc-${project.version}-bin.tar.gz</include>
+                  </includes>
+                </fileSet>
+              </fileSets>
+              <csvSummary>false</csvSummary>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>post-integration-test</phase>
+            <configuration>
+              <target>
+                <property name="spaces" value=" " />
+                <concat destfile="${project.build.directory}/apache-flume-jdbc-${project.version}-src.zip.sha512" append="yes">${spaces}apache-flume-jdbc-${project.version}-src.zip</concat>
+                <concat destfile="${project.build.directory}/apache-flume-jdbc-${project.version}-src.tar.gz.sha512" append="yes">${spaces}apache-flume-jdbc-${project.version}-src.tar.gz</concat>
+                <concat destfile="${project.build.directory}/apache-flume-jdbc-${project.version}-bin.zip.sha512" append="yes">${spaces}apache-flume-jdbc-${project.version}-bin.zip</concat>
+                <concat destfile="${project.build.directory}/apache-flume-jdbc-${project.version}-bin.tar.gz.sha512" append="yes">${spaces}apache-flume-jdbc-${project.version}-bin.tar.gz</concat>
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>source-release-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <finalName>apache-flume-jdbc-${project.version}</finalName>
+              <descriptors>
+                <descriptor>src/assembly/src.xml</descriptor>
+              </descriptors>
+              <tarLongFileMode>gnu</tarLongFileMode>
+            </configuration>
+          </execution>
+          <execution>
+            <id>binary</id>
+            <configuration>
+              <finalName>apache-flume-jdbc-${project.version}</finalName>
+              <descriptors>
+                <descriptor>src/assembly/bin.xml</descriptor>
+              </descriptors>
+              <tarLongFileMode>gnu</tarLongFileMode>
+            </configuration>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <phase>package</phase>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/flume-jdbc-dist/src/assembly/bin.xml b/flume-jdbc-dist/src/assembly/bin.xml
new file mode 100644
index 0000000..fbd1801
--- /dev/null
+++ b/flume-jdbc-dist/src/assembly/bin.xml
@@ -0,0 +1,50 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<assembly>
+    <id>bin</id>
+    <formats>
+        <format>tar.gz</format>
+        <format>zip</format>
+    </formats>
+    <baseDirectory>apache-flume-jdbc-${project.version}-bin</baseDirectory>
+    <includeSiteDirectory>false</includeSiteDirectory>
+  <moduleSets>
+    <moduleSet>
+      <useAllReactorProjects>true</useAllReactorProjects>
+    </moduleSet>
+  </moduleSets>
+  <dependencySets>
+    <dependencySet>
+      <includes>
+        <include>org.apache.flume:flume-jdbc</include>
+      </includes>
+      <outputDirectory></outputDirectory>
+      <unpack>false</unpack>
+    </dependencySet>
+  </dependencySets>
+
+    <fileSets>
+        <fileSet>
+            <directory>../</directory>
+            <includes>
+                <include>LICENSE.txt</include>
+                <include>NOTICE.txt</include>
+                <include>RELEASE-NOTES.txt</include>
+            </includes>
+        </fileSet>
+    </fileSets>
+</assembly>
diff --git a/flume-jdbc-dist/src/assembly/src.xml b/flume-jdbc-dist/src/assembly/src.xml
new file mode 100644
index 0000000..caa2b9d
--- /dev/null
+++ b/flume-jdbc-dist/src/assembly/src.xml
@@ -0,0 +1,45 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+<assembly>
+  <id>src</id>
+  <formats>
+    <format>zip</format>
+    <format>tar.gz</format>
+  </formats>
+  <baseDirectory>apache-flume-jdbc-${project.version}-src</baseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>../</directory>
+
+      <excludes>
+        <exclude>**/target/**</exclude>
+        <exclude>**/.classpath</exclude>
+        <exclude>**/.project</exclude>
+        <exclude>**/.idea/**</exclude>
+        <exclude>**/*.iml</exclude>
+        <exclude>**/.settings/**</exclude>
+        <exclude>lib/**</exclude>
+        <exclude>**/.DS_Store</exclude>
+        <exclude>./mvn/wrapper/maven-wrapper.jar</exclude>
+      </excludes>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..c43d6a7
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,275 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache</groupId>
+    <artifactId>apache</artifactId>
+    <version>29</version>
+  </parent>
+
+  <groupId>org.apache.flume</groupId>
+  <artifactId>flume-jdbc-parent</artifactId>
+  <name>Flume JDBC Parent</name>
+  <version>2.0.0-SNAPSHOT</version>
+  <packaging>pom</packaging>
+
+  <properties>
+    <ReleaseVersion>2.0.0</ReleaseVersion>
+    <ReleaseManager>Ralph Goers</ReleaseManager>
+    <ReleaseKey>B3D8E1BA</ReleaseKey>
+    <SigningUserName>rgoers@apache.org</SigningUserName>
+    <checksum-maven-plugin.version>1.11</checksum-maven-plugin.version>
+    <commons-dbcp.version>1.4</commons-dbcp.version>
+    <derby.version>10.14.2.0</derby.version>
+    <flume.version>1.11.0</flume.version>
+    <junit.version>4.13.2</junit.version>
+    <log4j.version>2.20.0</log4j.version>
+    <maven.compiler.source>1.8</maven.compiler.source>
+    <maven.compiler.target>1.8</maven.compiler.target>
+    <module.name>org.apache.flume.jdbc</module.name>
+    <mvn-gpg-plugin.version>1.6</mvn-gpg-plugin.version>
+    <mvn-javadoc-plugin.version>2.9</mvn-javadoc-plugin.version>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <rat.version>0.12</rat.version>
+    <slf4j.version>1.7.32</slf4j.version>
+    <spotbugs-maven-plugin.version>4.7.2.1</spotbugs-maven-plugin.version>
+    <spotless-maven-plugin.version>2.27.2</spotless-maven-plugin.version>
+  </properties>
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-jdbc-channel</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-sdk</artifactId>
+        <version>${flume.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-core</artifactId>
+        <version>${flume.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-configuration</artifactId>
+        <version>${flume.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>commons-dbcp</groupId>
+        <artifactId>commons-dbcp</artifactId>
+        <version>${commons-dbcp.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.derby</groupId>
+        <artifactId>derby</artifactId>
+        <version>${derby.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-1.2-api</artifactId>
+        <version>${log4j.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-api</artifactId>
+        <version>${log4j.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-slf4j-impl</artifactId>
+        <version>${log4j.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>junit</groupId>
+        <artifactId>junit</artifactId>
+        <version>${junit.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.slf4j</groupId>
+        <artifactId>slf4j-api</artifactId>
+        <version>${slf4j.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <inceptionYear>2022</inceptionYear>
+
+  <issueManagement>
+    <system>JIRA</system>
+    <url>https://issues.apache.org/jira/browse/FLUME</url>
+  </issueManagement>
+
+  <licenses>
+    <license>
+      <name>The Apache Software License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+    </license>
+  </licenses>
+
+  <mailingLists>
+    <mailingList>
+      <archive>http://mail-archives.apache.org/mod_mbox/flume-user/</archive>
+      <name>Flume User List</name>
+      <post>user@flume.apache.org</post>
+      <subscribe>user-subscribe@flume.apache.org</subscribe>
+      <unsubscribe>user-unsubscribe@flume.apache.org</unsubscribe>
+    </mailingList>
+    <mailingList>
+      <archive>http://mail-archives.apache.org/mod_mbox/flume-dev/</archive>
+      <name>Flume Developer List</name>
+      <post>dev@flume.apache.org</post>
+      <subscribe>dev-subscribe@flume.apache.org</subscribe>
+      <unsubscribe>dev-unsubscribe@flume.apache.org</unsubscribe>
+    </mailingList>
+    <mailingList>
+      <archive>http://mail-archives.apache.org/mod_mbox/flume-commits/</archive>
+      <name>Flume Commits</name>
+      <post>commits@flume.apache.org</post>
+      <subscribe>commits-subscribe@flume.apache.org</subscribe>
+      <unsubscribe>commits-unsubscribe@flume.apache.org</unsubscribe>
+    </mailingList>
+  </mailingLists>
+
+  <scm>
+    <url>https://gitbox.apache.org/repos/asf/flume-spring-boot.git</url>
+    <developerConnection>https://gitbox.apache.org/repos/asf/flume-spring-boot.git</developerConnection>
+    <connection>https://gitbox.apache.org/repos/asf/flume-spring-boot.git</connection>
+  </scm>
+
+  <developers>
+    <developer>
+      <name>Ralph Goers</name>
+      <id>rgoers</id>
+      <email>rgoers@apache.org</email>
+      <organization>Intuit</organization>
+    </developer>
+  </developers>
+
+  <organization>
+    <name>Apache Software Foundation</name>
+    <url>http://www.apache.org</url>
+  </organization>
+  <modules>
+    <module>flume-jdbc-channel</module>
+  </modules>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <version>${rat.version}</version>
+        <configuration>
+          <excludes>
+            <exclude>**/.idea/</exclude>
+            <exclude>**/*.iml</exclude>
+            <exclude>src/main/resources/META-INF/services/**/*</exclude>
+            <exclude>**/nb-configuration.xml</exclude>
+            <exclude>.git/</exclude>
+            <exclude>patchprocess/</exclude>
+            <exclude>.gitignore</exclude>
+            <exclude>**/*.yml</exclude>
+            <exclude>**/*.yaml</exclude>
+            <exclude>**/*.json</exclude>
+            <!-- ASF jenkins box puts the Maven repo in our root directory. -->
+            <exclude>.repository/</exclude>
+            <exclude>**/*.diff</exclude>
+            <exclude>**/*.patch</exclude>
+            <exclude>**/*.avsc</exclude>
+            <exclude>**/*.avro</exclude>
+            <exclude>**/docs/**</exclude>
+            <exclude>**/test/resources/**</exclude>
+            <exclude>**/.settings/*</exclude>
+            <exclude>**/.classpath</exclude>
+            <exclude>**/.project</exclude>
+            <exclude>**/target/**</exclude>
+            <exclude>**/derby.log</exclude>
+            <exclude>**/metastore_db/</exclude>
+            <exclude>.mvn/**</exclude>
+            <exclude>**/exclude-pmd.properties</exclude>
+          </excludes>
+          <consoleOutput>true</consoleOutput>
+        </configuration>
+        <executions>
+          <execution>
+            <id>verify.rat</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>release</id>
+      <modules>
+        <module>flume-jdbc-dist</module>
+      </modules>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-gpg-plugin</artifactId>
+            <version>${mvn-gpg-plugin.version}</version>
+            <executions>
+              <execution>
+                <phase>verify</phase>
+                <goals>
+                  <goal>sign</goal>
+                </goals>
+                <configuration>
+                  <keyname>${SigningUserName}</keyname>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <version>${mvn-javadoc-plugin.version}</version>
+            <executions>
+              <execution>
+                <id>javadoc-jar</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>aggregate-jar</goal>
+                </goals>
+              </execution>
+            </executions>
+            <configuration>
+              <additionalparam>-Xdoclint:none</additionalparam>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+</project>