You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/13 10:00:59 UTC
svn commit: r1182709 - in
/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src:
main/java/org/apache/flume/channel/jdbc/
main/java/org/apache/flume/channel/jdbc/impl/
test/java/org/apache/flume/channel/jdbc/
Author: arvind
Date: Thu Oct 13 08:00:59 2011
New Revision: 1182709
URL: http://svn.apache.org/viewvc?rev=1182709&view=rev
Log:
Work related to JDBC channel implementation
Added:
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
Modified:
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java Thu Oct 13 08:00:59 2011
@@ -51,6 +51,43 @@ public final class ConfigurationConstant
public static final String CONFIG_MAX_CONNECTION =
PREFIX + "maximum.connections";
+ // Built in constants for JDBC Channel implementation
+
+ /**
+ * The length for payload bytes that will stored inline. Payloads larger
+ * than this length will spill into BLOB.
+ */
+ public static int PAYLOAD_LENGTH_THRESHOLD = 16384; // 16kb
+
+ /**
+ * The length of header name in bytes that will be stored inline. Header
+ * names longer than this number will spill over into CLOB.
+ */
+ public static int HEADER_NAME_LENGTH_THRESHOLD = 251;
+
+ /**
+ * The length of header value in bytes that will be stored inline. Header
+ * values longer than this number will spill over into CLOB.
+ */
+ public static int HEADER_VALUE_LENGTH_THRESHOLD = 251;
+
+ /**
+ * The maximum length of channel name.
+ */
+ public static int CHANNEL_NAME_MAX_LENGTH = 64;
+
+ /**
+ * The maximum spill size for header names. Together with the value of
+ * HEADER_NAME_LENGTH_THRESHOLD this adds up to 32kb.
+ */
+ public static int HEADER_NAME_SPILL_MAX_LENGTH = 32517;
+
+ /**
+ * The maximum spill size for header values. Together with the value of
+ * HEADER_VALUE_LENGTH_THRESHOLD, this adds up to 32kb.
+ */
+ public static int HEADER_VALUE_SPILL_MAX_LENGTH = 32517;
+
private ConfigurationConstants() {
// Disable object creation
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java Thu Oct 13 08:00:59 2011
@@ -27,10 +27,90 @@ import java.util.Set;
import javax.sql.DataSource;
+import org.apache.flume.channel.jdbc.ConfigurationConstants;
import org.apache.flume.channel.jdbc.JdbcChannelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * <p>
+ * Schema handler for Derby Database. This handler uses the following schema:
+ * </p>
+ *
+ * <p><strong><tt>FL_EVENT</tt></strong>: The main event table. This table
+ * contains an auto-generated event ID along with the first 16kb of payload
+ * data. If the payload is larger than 16kb, a spill indicator flag is set and
+ * the remaining data is recorded in the <tt>FL_PLSPILL</tt> table.</p>
+ * <pre>
+ * +-------------------------------+
+ * | FL_EVENT |
+ * +-------------------------------+
+ * | FLE_ID : BIGINT PK | (auto-gen sequence)
+ * | FLE_PAYLOAD: VARBINARY(16384) | (16kb payload)
+ * | FLE_SPILL : BOOLEAN | (true if payload spills)
+ * | FLE_CHANNEL: VARCHAR(64) |
+ * +-------------------------------+
+ * </pre>
+ *
+ * <p><strong><tt>FL_PLSPILL</tt></strong>: This table holds payloads in excess
+ * of 16kb and relates back to the <tt>FL_EVENT</tt> table using foreign key
+ * reference via <tt>FLP_EVENT</tt> column.</p>
+ * <pre>
+ * +---------------------+
+ * | FL_PLSPILL |
+ * +---------------------+
+ * | FLP_EVENT : BIGINT | (FK into FL_EVENT.FLE_ID)
+ * | FLP_SPILL : BLOB |
+ * +---------------------+
+ * </pre>
+ * <p><strong><tt>FL_HEADER</tt></strong>: The table that holds headers. This
+ * table contains name value pairs of headers less than or up to first 255
+ * bytes each. If a name is longer than 255 bytes, a spill indicator flag is
+ * set and the remaining bytes are recorded in <tt>FL_NMSPILL</tt> table.
+ * Similarly if the value is longer than 255 bytes, a spill indicator flag is
+ * set and the remaining bytes are recorded in <tt>FL_VLSPILL</tt> table. Each
+ * header record relates back to the <tt>FL_EVENT</tt> table using foreign key
+ * reference via <tt>FLH_EVENT</tt> column.</p>
+ * <pre>
+ * +--------------------------+
+ * | FL_HEADER |
+ * +--------------------------+
+ * | FLH_ID : BIGINT PK | (auto-gen sequence)
+ * | FLH_EVENT : BIGINT | (FK into FL_EVENT.FLE_ID)
+ * | FLH_NAME : VARCHAR(251)|
+ * | FLH_VALUE : VARCHAR(251)|
+ * | FLH_NMSPILL: BOOLEAN | (true if name spills)
+ * | FLH_VLSPILL: BOOLEAN | (true if value spills)
+ * +--------------------------+
+ * </pre>
+ * <p><strong><tt>FL_NMSPILL</tt></strong>: The table that holds header names
+ * in excess of 255 bytes and relates back to the <tt>FL_HEADER</tt> table
+ * using foreign key reference via <tt>FLN_HEADER</tt> column.</p>
+ * <pre>
+ * +----------------------+
+ * | FL_NMSPILL |
+ * +----------------------+
+ * | FLN_HEADER : BIGINT | (FK into FL_HEADER.FLH_ID)
+ * | FLN_SPILL : CLOB |
+ * +----------------------+
+ * </pre>
+ * <p><strong><tt>FL_VLSPILL</tt></strong>: The table that holds header values
+ * in excess of 255 bytes and relates back to the <tt>FL_HEADER</tt> table
+ * using foreign key reference via <tt>FLV_HEADER</tt> column.</p>
+ * <pre>
+ * +----------------------+
+ * | FL_VLSPILL |
+ * +----------------------+
+ * | FLV_HEADER : BIGINT | (FK into FL_HEADER.FLH_ID)
+ * | FLV_SPILL : CLOB |
+ * +----------------------+
+ * </pre>
+ * </p>
+ * <p><strong>NOTE</strong>: The values that decide the spill boundary
+ * and storage length limits are defined in <tt>ConfigurationConstants</tt>
+ * class.</p>
+ * @see org.apache.flume.channel.jdbc.ConfigurationConstants
+ */
public class DerbySchemaHandler implements SchemaHandler {
private static final Logger LOGGER =
@@ -48,20 +128,35 @@ public class DerbySchemaHandler implemen
private static final String COLUMN_FLE_ID = "FLE_ID";
private static final String COLUMN_FLE_PAYLOAD = "FLE_PAYLOAD";
private static final String COLUMN_FLE_CHANNEL = "FLE_CHANNEL";
+ private static final String COLUMN_FLE_SPILL = "FLE_SPILL";
- private static final String TABLE_FL_PLEXT_NAME = "FL_PLEXT";
- private static final String TABLE_FL_PLEXT = SCHEMA_FLUME + "."
- + TABLE_FL_PLEXT_NAME;
- private static final String COLUMN_FLP_EVENTID = "FLP_EVENTID";
+ private static final String TABLE_FL_PLSPILL_NAME = "FL_PLSPILL";
+ private static final String TABLE_FL_PLSPILL = SCHEMA_FLUME + "."
+ + TABLE_FL_PLSPILL_NAME;
+ private static final String COLUMN_FLP_EVENT = "FLP_EVENT";
private static final String COLUMN_FLP_SPILL = "FLP_SPILL";
private static final String TABLE_FL_HEADER_NAME = "FL_HEADER";
private static final String TABLE_FL_HEADER = SCHEMA_FLUME + "."
+ TABLE_FL_HEADER_NAME;
- private static final String COLUMN_FLH_EVENTID = "FLH_EVENTID";
+ private static final String COLUMN_FLH_ID = "FLH_ID";
+ private static final String COLUMN_FLH_EVENT = "FLH_EVENT";
private static final String COLUMN_FLH_NAME = "FLH_NAME";
private static final String COLUMN_FLH_VALUE = "FLH_VALUE";
+ private static final String COLUMN_FLH_NMSPILL = "FLH_NMSPILL";
+ private static final String COLUMN_FLH_VLSPILL = "FLH_VLSPILL";
+ private static final String TABLE_FL_NMSPILL_NAME = "FL_NMSPILL";
+ private static final String TABLE_FL_NMSPILL = SCHEMA_FLUME + "."
+ + TABLE_FL_NMSPILL_NAME;
+ private static final String COLUMN_FLN_HEADER = "FLN_HEADER";
+ private static final String COLUMN_FLN_SPILL = "FLN_SPILL";
+
+ private static final String TABLE_FL_VLSPILL_NAME = "FL_VLSPILL";
+ private static final String TABLE_FL_VLSPILL = SCHEMA_FLUME + "."
+ + TABLE_FL_VLSPILL_NAME;
+ private static final String COLUMN_FLV_HEADER = "FLV_HEADER";
+ private static final String COLUMN_FLV_SPILL = "FLV_SPILL";
public static final String QUERY_CREATE_SCHEMA_FLUME
= "CREATE SCHEMA " + SCHEMA_FLUME;
@@ -69,25 +164,51 @@ public class DerbySchemaHandler implemen
public static final String QUERY_CREATE_TABLE_FL_EVENT
= "CREATE TABLE " + TABLE_FL_EVENT + " ( "
+ COLUMN_FLE_ID + " BIGINT GENERATED ALWAYS AS IDENTITY "
- + "(START WITH 2, INCREMENT BY 1) PRIMARY KEY, "
- + COLUMN_FLE_PAYLOAD + " VARCHAR(16384) FOR BIT DATA, " // 16kb
- + COLUMN_FLE_CHANNEL + " VARCHAR(32))";
-
- public static final String QUERY_CREATE_TABLE_FL_PLEXT
- = "CREATE TABLE " + TABLE_FL_PLEXT + " ( "
- + COLUMN_FLP_EVENTID + " BIGINT, "
+ + "(START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ + COLUMN_FLE_PAYLOAD + " VARCHAR("
+ + ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD
+ + ") FOR BIT DATA, "
+ + COLUMN_FLE_CHANNEL + " VARCHAR("
+ + ConfigurationConstants.CHANNEL_NAME_MAX_LENGTH + "), "
+ + COLUMN_FLE_SPILL + " BOOLEAN)";
+
+ public static final String QUERY_CREATE_TABLE_FL_PLSPILL
+ = "CREATE TABLE " + TABLE_FL_PLSPILL + " ( "
+ + COLUMN_FLP_EVENT + " BIGINT, "
+ COLUMN_FLP_SPILL + " BLOB, "
- + "FOREIGN KEY (" + COLUMN_FLP_EVENTID + ") REFERENCES "
+ + "FOREIGN KEY (" + COLUMN_FLP_EVENT + ") REFERENCES "
+ TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + "))";
public static final String QUERY_CREATE_TABLE_FL_HEADER
= "CREATE TABLE " + TABLE_FL_HEADER + " ( "
- + COLUMN_FLH_EVENTID + " BIGINT, "
- + COLUMN_FLH_NAME + " VARCHAR(255), "
- + COLUMN_FLH_VALUE + " VARCHAR(255), "
- + "FOREIGN KEY (" + COLUMN_FLH_EVENTID + ") REFERENCES "
+ + COLUMN_FLH_ID + " BIGINT GENERATED ALWAYS AS IDENTITY "
+ + "(START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ + COLUMN_FLH_EVENT + " BIGINT, "
+ + COLUMN_FLH_NAME + " VARCHAR("
+ + ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD + "), "
+ + COLUMN_FLH_VALUE + " VARCHAR("
+ + ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD + "), "
+ + COLUMN_FLH_NMSPILL + " BOOLEAN, "
+ + COLUMN_FLH_VLSPILL + " BOOLEAN, "
+ + "FOREIGN KEY (" + COLUMN_FLH_EVENT + ") REFERENCES "
+ TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + "))";
+ public static final String QUERY_CREATE_TABLE_FL_NMSPILL
+ = "CREATE TABLE " + TABLE_FL_NMSPILL + " ( "
+ + COLUMN_FLN_HEADER + " BIGINT, "
+ + COLUMN_FLN_SPILL + " VARCHAR("
+ + ConfigurationConstants.HEADER_NAME_SPILL_MAX_LENGTH + "), "
+ + "FOREIGN KEY (" + COLUMN_FLN_HEADER + ") REFERENCES "
+ + TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + "))";
+
+ public static final String QUERY_CREATE_TABLE_FL_VLSPILL
+ = "CREATE TABLE " + TABLE_FL_VLSPILL + " ( "
+ + COLUMN_FLV_HEADER + " BIGINT, "
+ + COLUMN_FLV_SPILL + " VARCHAR("
+ + ConfigurationConstants.HEADER_VALUE_SPILL_MAX_LENGTH + "), "
+ + "FOREIGN KEY (" + COLUMN_FLV_HEADER + ") REFERENCES "
+ + TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + "))";
+
public static final String COLUMN_LOOKUP_QUERY =
"SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
+ "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
@@ -100,7 +221,6 @@ public class DerbySchemaHandler implemen
this.dataSource = dataSource;
}
-
@Override
public boolean schemaExists() {
Connection connection = null;
@@ -149,20 +269,30 @@ public class DerbySchemaHandler implemen
public void createSchemaObjects() {
runQuery(QUERY_CREATE_SCHEMA_FLUME);
runQuery(QUERY_CREATE_TABLE_FL_EVENT);
- runQuery(QUERY_CREATE_TABLE_FL_PLEXT);
+ runQuery(QUERY_CREATE_TABLE_FL_PLSPILL);
runQuery(QUERY_CREATE_TABLE_FL_HEADER);
+ runQuery(QUERY_CREATE_TABLE_FL_NMSPILL);
+ runQuery(QUERY_CREATE_TABLE_FL_VLSPILL);
}
@Override
public void validateSchema() {
verifyTableStructure(SCHEMA_FLUME, TABLE_FL_EVENT_NAME,
- COLUMN_FLE_ID, COLUMN_FLE_PAYLOAD, COLUMN_FLE_CHANNEL);
+ COLUMN_FLE_ID, COLUMN_FLE_PAYLOAD, COLUMN_FLE_CHANNEL,
+ COLUMN_FLE_SPILL);
- verifyTableStructure(SCHEMA_FLUME, TABLE_FL_PLEXT_NAME,
- COLUMN_FLP_EVENTID, COLUMN_FLP_SPILL);
+ verifyTableStructure(SCHEMA_FLUME, TABLE_FL_PLSPILL_NAME,
+ COLUMN_FLP_EVENT, COLUMN_FLP_SPILL);
verifyTableStructure(SCHEMA_FLUME, TABLE_FL_HEADER_NAME,
- COLUMN_FLH_EVENTID, COLUMN_FLH_NAME, COLUMN_FLH_VALUE);
+ COLUMN_FLH_ID, COLUMN_FLH_EVENT, COLUMN_FLH_NAME, COLUMN_FLH_VALUE,
+ COLUMN_FLH_NMSPILL, COLUMN_FLH_VLSPILL);
+
+ verifyTableStructure(SCHEMA_FLUME, TABLE_FL_NMSPILL_NAME,
+ COLUMN_FLN_HEADER, COLUMN_FLN_SPILL);
+
+ verifyTableStructure(SCHEMA_FLUME, TABLE_FL_VLSPILL_NAME,
+ COLUMN_FLV_HEADER, COLUMN_FLV_SPILL);
}
private void verifyTableStructure(String schemaName, String tableName,
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Thu Oct 13 08:00:59 2011
@@ -171,8 +171,23 @@ public class JdbcChannelProviderImpl imp
@Override
public void persistEvent(String channelName, Event event) {
- // TODO Auto-generated method stub
+ PersistableEvent persistableEvent = new PersistableEvent(event);
+ Transaction tx = null;
+ try {
+ tx = getTransaction();
+ tx.begin();
+
+ // Persist the persistableEvent
+ tx.commit();
+ } catch (Exception ex) {
+ tx.rollback();
+ throw new JdbcChannelException("Failed to persist event", ex);
+ } finally {
+ if (tx != null) {
+ tx.close();
+ }
+ }
}
@Override
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java Thu Oct 13 08:00:59 2011
@@ -34,10 +34,6 @@ public class JdbcTransactionFactory exte
@Override
protected JdbcTransactionImpl initialValue() {
- try {
- return new JdbcTransactionImpl(dataSource.getConnection(), this);
- } catch (SQLException ex) {
- throw new JdbcChannelException("Unable to create connection", ex);
- }
+ return new JdbcTransactionImpl(dataSource, this);
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java Thu Oct 13 08:00:59 2011
@@ -21,6 +21,8 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLWarning;
+import javax.sql.DataSource;
+
import org.apache.flume.Transaction;
import org.apache.flume.channel.jdbc.JdbcChannelException;
import org.slf4j.Logger;
@@ -32,6 +34,7 @@ public class JdbcTransactionImpl impleme
private static final Logger LOGGER =
LoggerFactory.getLogger(JdbcTransactionImpl.class);
+ private final DataSource dataSource;
private Connection connection;
private JdbcTransactionFactory txFactory;
boolean active = true;
@@ -39,16 +42,10 @@ public class JdbcTransactionImpl impleme
boolean rollback = false;
- protected JdbcTransactionImpl(Connection conn,
+ protected JdbcTransactionImpl(DataSource dataSource,
JdbcTransactionFactory factory) {
- connection = conn;
+ this.dataSource = dataSource;
txFactory = factory;
-
- try {
- connection.clearWarnings();
- } catch (SQLException ex) {
- LOGGER.error("Error while clearing warnings: " + ex.getErrorCode(), ex);
- }
}
@Override
@@ -56,6 +53,20 @@ public class JdbcTransactionImpl impleme
if (!active) {
throw new JdbcChannelException("Inactive transaction");
}
+ if (count == 0) {
+ // Lease a connection now
+ try {
+ connection = dataSource.getConnection();
+ } catch (SQLException ex) {
+ throw new JdbcChannelException("Unable to lease connection", ex);
+ }
+ // Clear any prior warnings on the connection
+ try {
+ connection.clearWarnings();
+ } catch (SQLException ex) {
+ LOGGER.error("Error while clearing warnings: " + ex.getErrorCode(), ex);
+ }
+ }
count++;
LOGGER.debug("Tx count-begin: " + count + ", rollback: " + rollback);
}
@@ -77,6 +88,7 @@ public class JdbcTransactionImpl impleme
if (!active) {
throw new JdbcChannelException("Inactive transaction");
}
+ LOGGER.warn("Marking transaction for rollback");
rollback = true;
LOGGER.debug("Tx count-rollback: " + count + ", rollback: " + rollback);
}
Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java?rev=1182709&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java Thu Oct 13 08:00:59 2011
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Event;
+import org.apache.flume.channel.jdbc.ConfigurationConstants;
+
+public class PersistableEvent {
+
+ private long eventId;
+ private byte[] payload;
+ private byte[] spill;
+ private List<HeaderEntry> headers;
+
+ public PersistableEvent(Event event) {
+
+ byte[] givenPayload = event.getBody();
+ if (givenPayload.length < ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD) {
+ payload = Arrays.copyOf(givenPayload, givenPayload.length);
+ spill = null;
+ } else {
+ payload = Arrays.copyOfRange(givenPayload, 0,
+ ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD);
+ spill = Arrays.copyOfRange(givenPayload,
+ ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD, givenPayload.length);
+ }
+
+ Map<String, String> headerMap = event.getHeaders();
+ if (headerMap != null && headerMap.size() > 0) {
+ headers = new ArrayList<HeaderEntry>();
+ for (Map.Entry<String, String> entry : headerMap.entrySet()) {
+ String name = entry.getKey();
+ String value = entry.getValue();
+ headers.add(new HeaderEntry(name, value));
+ }
+ }
+ }
+
+ public byte[] getPayload() {
+ byte[] result = null;
+ if (spill == null) {
+ result = Arrays.copyOf(payload, payload.length);
+ } else {
+ result = new byte[payload.length + spill.length];
+ System.arraycopy(payload, 0, result, 0, payload.length);
+ System.arraycopy(spill, 0, result, payload.length, spill.length);
+ }
+
+ return result;
+ }
+
+ public Map<String, String> getHeaders() {
+ Map<String, String> headerMap = null;
+ if (headers != null) {
+ headerMap = new HashMap<String, String>();
+ for (HeaderEntry entry : headers) {
+ headerMap.put(entry.getName(), entry.getValue());
+ }
+ }
+
+ return headerMap;
+ }
+
+ public static class HeaderEntry {
+
+ private SpillableString nameString;
+ private SpillableString valueString;
+
+ public HeaderEntry(String name, String value) {
+ nameString = new SpillableString(name,
+ ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD);
+ valueString = new SpillableString(value,
+ ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD);
+ }
+
+ public String getName() {
+ return nameString.getString();
+ }
+
+ public String getValue() {
+ return valueString.getString();
+ }
+ }
+
+ private static class SpillableString {
+
+ private String base;
+ private String spill;
+
+ public SpillableString(String string, int threshold) {
+ if (string.getBytes().length < threshold) {
+ base = string;
+ } else {
+ // Identify the maximum character size that will fit in the
+ // given threshold
+ int currentIndex = threshold / 3; // Assuming 3 byte encoding worst case
+ int lastIndex = currentIndex;
+ while (true) {
+ int length = string.substring(0, currentIndex).getBytes().length;
+ if (length < threshold) {
+ lastIndex = currentIndex;
+ currentIndex++;
+ } else {
+ break;
+ }
+ }
+ base = string.substring(0, lastIndex);
+ spill = string.substring(lastIndex);
+ }
+ }
+
+ public String getBase() {
+ return base;
+ }
+
+ public String getSpill() {
+ return spill;
+ }
+
+ public String getString() {
+ if (spill == null) {
+ return base;
+ }
+ return base + spill;
+ }
+ }
+}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java Thu Oct 13 08:00:59 2011
@@ -28,18 +28,32 @@ public class TestDerbySchemaHandlerQueri
public static final String EXPECTED_QUERY_CREATE_TABLE_FL_EVENT
= "CREATE TABLE FLUME.FL_EVENT ( FLE_ID BIGINT GENERATED ALWAYS AS "
- + "IDENTITY (START WITH 2, INCREMENT BY 1) PRIMARY KEY, FLE_PAYLOAD "
- + "VARCHAR(16384) FOR BIT DATA, FLE_CHANNEL VARCHAR(32))";
-
- public static final String EXPECTED_QUERY_CREATE_TABLE_FL_PLEXT
- = "CREATE TABLE FLUME.FL_PLEXT ( FLP_EVENTID BIGINT, FLP_SPILL BLOB, "
- + "FOREIGN KEY (FLP_EVENTID) REFERENCES FLUME.FL_EVENT (FLE_ID))";
+ + "IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, FLE_PAYLOAD "
+ + "VARCHAR(16384) FOR BIT DATA, FLE_CHANNEL VARCHAR(64), "
+ + "FLE_SPILL BOOLEAN)";
+
+ public static final String EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL
+ = "CREATE TABLE FLUME.FL_PLSPILL ( FLP_EVENT BIGINT, FLP_SPILL BLOB, "
+ + "FOREIGN KEY (FLP_EVENT) REFERENCES FLUME.FL_EVENT (FLE_ID))";
public static final String EXPECTED_QUERY_CREATE_TABLE_FL_HEADER
- = "CREATE TABLE FLUME.FL_HEADER ( FLH_EVENTID BIGINT, FLH_NAME "
- + "VARCHAR(255), FLH_VALUE VARCHAR(255), FOREIGN KEY (FLH_EVENTID) "
+ = "CREATE TABLE FLUME.FL_HEADER ( FLH_ID BIGINT GENERATED ALWAYS AS "
+ + "IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ + "FLH_EVENT BIGINT, FLH_NAME VARCHAR(251), "
+ + "FLH_VALUE VARCHAR(251), FLH_NMSPILL BOOLEAN, "
+ + "FLH_VLSPILL BOOLEAN, FOREIGN KEY (FLH_EVENT) "
+ "REFERENCES FLUME.FL_EVENT (FLE_ID))";
+ public static final String EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL
+ = "CREATE TABLE FLUME.FL_NMSPILL ( FLN_HEADER BIGINT, FLN_SPILL "
+ + "VARCHAR(32517), FOREIGN KEY (FLN_HEADER) REFERENCES "
+ + "FLUME.FL_HEADER (FLH_ID))";
+
+ public static final String EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL
+ = "CREATE TABLE FLUME.FL_VLSPILL ( FLV_HEADER BIGINT, FLV_SPILL "
+ + "VARCHAR(32517), FOREIGN KEY (FLV_HEADER) REFERENCES "
+ + "FLUME.FL_HEADER (FLH_ID))";
+
@Test
public void testCreateQueries() {
@@ -49,12 +63,18 @@ public class TestDerbySchemaHandlerQueri
Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_EVENT,
EXPECTED_QUERY_CREATE_TABLE_FL_EVENT);
- Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_PLEXT,
- EXPECTED_QUERY_CREATE_TABLE_FL_PLEXT);
+ Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_PLSPILL,
+ EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL);
Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_HEADER,
EXPECTED_QUERY_CREATE_TABLE_FL_HEADER);
+ Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_NMSPILL,
+ EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL);
+
+ Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_VLSPILL,
+ EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL);
+
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1182709&r1=1182708&r2=1182709&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Thu Oct 13 08:00:59 2011
@@ -4,8 +4,11 @@ import java.io.File;
import java.io.IOException;
import java.util.Properties;
+import junit.framework.Assert;
+
import org.apache.flume.Transaction;
import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -18,6 +21,7 @@ public class TestJdbcChannelProvider {
private Properties derbyProps = new Properties();
private File derbyDbDir;
+ private JdbcChannelProviderImpl provider;
@Before
public void setUp() throws IOException {
@@ -52,16 +56,39 @@ public class TestJdbcChannelProvider {
@Test
public void testDerbySetup() {
- JdbcChannelProviderImpl jdbcProviderImpl =
- new JdbcChannelProviderImpl();
+ provider = new JdbcChannelProviderImpl();
+
+ provider.initialize(derbyProps);
+
+ Transaction tx1 = provider.getTransaction();
+ tx1.begin();
+
+ Transaction tx2 = provider.getTransaction();
+
+ Assert.assertSame(tx1, tx2);
+ tx2.begin();
+ tx2.close();
+ tx1.close();
- jdbcProviderImpl.initialize(derbyProps);
+ Transaction tx3 = provider.getTransaction();
+ Assert.assertNotSame(tx1, tx3);
- Transaction tx = jdbcProviderImpl.getTransaction();
- tx.begin();
- tx.begin();
- tx.close();
- tx.close();
- jdbcProviderImpl.close();
+ tx3.begin();
+ tx3.close();
+
+ provider.close();
+ provider = null;
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (provider != null) {
+ try {
+ provider.close();
+ } catch (Exception ex) {
+ LOGGER.error("Unable to close provider", ex);
+ }
+ }
+ provider = null;
}
}
Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java?rev=1182709&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java Thu Oct 13 08:00:59 2011
@@ -0,0 +1,166 @@
+package org.apache.flume.channel.jdbc;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.flume.Event;
+import org.apache.flume.channel.jdbc.impl.PersistableEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestPersistentEvent {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TestPersistentEvent.class);
+
+ private static final String[] CHARS = new String[] {
+ "a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r",
+ "s","t","u","v","w","x","y","z",
+ "A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R",
+ "S","T","U","V","W","X","Y","Z",
+ "0","1","2","3","4","5","6","7","8","9",
+ "!","@","#","$","%","^","&","*","(",")",
+ "[","]","{","}",":",";","\"","'",",",".","<",">","?","/","\\","|",
+ };
+
+ @Test
+ public void testMarshalling() {
+ Random rnd = new Random(System.currentTimeMillis());
+
+ int nameLimit = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
+ int valLimit = ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD;
+
+ byte[] s1 = new byte[1];
+ rnd.nextBytes(s1);
+ runTest(s1, null);
+
+ byte[] s2 = new byte[2];
+ rnd.nextBytes(s2);
+ runTest(s2, new HashMap<String, String>());
+
+ byte[] s3 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD - 2];
+ rnd.nextBytes(s3);
+ Map<String, String> m3 = new HashMap<String, String>();
+ m3.put(generateString(rnd, 1), generateString(rnd, 1));
+ runTest(s3, m3);
+
+ byte[] s4 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD - 1];
+ rnd.nextBytes(s4);
+ Map<String, String> m4 = new HashMap<String, String>();
+ m4.put(generateString(rnd, nameLimit - 21), "w");
+ m4.put(generateString(rnd, nameLimit - 2), "x");
+ m4.put(generateString(rnd, nameLimit - 1), "y");
+ m4.put(generateString(rnd, nameLimit), "z");
+ m4.put(generateString(rnd, nameLimit + 1), "a");
+ m4.put(generateString(rnd, nameLimit + 2), "b");
+ m4.put(generateString(rnd, nameLimit + 21), "c");
+ runTest(s4, m4);
+
+ byte[] s5 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD];
+ rnd.nextBytes(s5);
+ Map<String, String> m5 = new HashMap<String, String>();
+ m5.put("w", generateString(rnd, valLimit - 21));
+ m5.put("x", generateString(rnd, valLimit - 2));
+ m5.put("y", generateString(rnd, valLimit - 1));
+ m5.put("z", generateString(rnd, valLimit));
+ m5.put("a", generateString(rnd, valLimit + 1));
+ m5.put("b", generateString(rnd, valLimit + 2));
+ m5.put("c", generateString(rnd, valLimit + 21));
+ runTest(s5, m5);
+
+ byte[] s6 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 1];
+ rnd.nextBytes(s6);
+ Map<String, String> m6 = new HashMap<String, String>();
+ m6.put(generateString(rnd, nameLimit - 21),
+ generateString(rnd, valLimit - 21));
+ m6.put(generateString(rnd, nameLimit - 2),
+ generateString(rnd, valLimit - 2));
+ m6.put(generateString(rnd, nameLimit - 1),
+ generateString(rnd, valLimit - 1));
+ m6.put(generateString(rnd, nameLimit),
+ generateString(rnd, valLimit));
+ m6.put(generateString(rnd, nameLimit + 1),
+ generateString(rnd, valLimit + 1));
+ m6.put(generateString(rnd, nameLimit + 2),
+ generateString(rnd, valLimit + 2));
+ m6.put(generateString(rnd, nameLimit + 21),
+ generateString(rnd, valLimit + 21));
+
+ runTest(s6, m6);
+
+ byte[] s7 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 2];
+ rnd.nextBytes(s7);
+ runTest(s7, null);
+
+ byte[] s8 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 27];
+ rnd.nextBytes(s8);
+ runTest(s8, null);
+ }
+
+ private String generateString(Random rnd, int size) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ int x = Math.abs(rnd.nextInt());
+ int y = x % CHARS.length;
+ sb.append(CHARS[y]);
+ }
+ System.out.println("String: " + sb);
+ return sb.toString();
+ }
+
+
+
+ private void runTest(byte[] payload, Map<String, String> headers) {
+ PersistableEvent pe = new PersistableEvent(new MockEvent(payload, headers));
+ Assert.assertArrayEquals(payload, pe.getPayload());
+ Map<String, String> h = pe.getHeaders();
+ if (h == null) {
+ Assert.assertTrue(headers == null || headers.size() == 0);
+ } else {
+ Assert.assertTrue(headers.size() == h.size());
+ for (String key : h.keySet()) {
+ Assert.assertTrue(headers.containsKey(key));
+ String value = h.get(key);
+ String expectedValue = headers.remove(key);
+ Assert.assertEquals(expectedValue, value);
+ }
+ Assert.assertTrue(headers.size() == 0);
+ }
+ }
+
+
+
+ private static class MockEvent implements Event {
+
+ private final byte[] payload;
+ private final Map<String, String> headers;
+
+ private MockEvent(byte[] payload, Map<String, String> headers) {
+ this.payload = payload;
+ this.headers = headers;
+ }
+
+ @Override
+ public Map<String, String> getHeaders() {
+ return headers;
+ }
+
+ @Override
+ public void setHeaders(Map<String, String> headers) {
+
+ }
+
+ @Override
+ public byte[] getBody() {
+ return payload;
+ }
+
+ @Override
+ public void setBody(byte[] body) {
+
+ }
+
+ }
+}