You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/11/30 19:17:31 UTC
svn commit: r1208624 - in
/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src:
main/java/org/apache/flume/channel/jdbc/
main/java/org/apache/flume/channel/jdbc/impl/
test/java/org/apache/flume/channel/jdbc/
Author: arvind
Date: Wed Nov 30 18:17:30 2011
New Revision: 1208624
URL: http://svn.apache.org/viewvc?rev=1208624&view=rev
Log:
FLUME-821. Support for creating indexes in JDBC channel.
Modified:
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java Wed Nov 30 18:17:30 2011
@@ -45,6 +45,9 @@ public final class ConfigurationConstant
public static final String CONFIG_CREATE_SCHEMA =
PREFIX + "create.schema";
+ public static final String CONFIG_CREATE_INDEX =
+ PREFIX + "create.index";
+
public static final String CONFIG_TX_ISOLATION_LEVEL =
PREFIX + "transaction.isolation";
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java Wed Nov 30 18:17:30 2011
@@ -140,12 +140,18 @@ public class DerbySchemaHandler implemen
private static final String COLUMN_FLE_PAYLOAD = "FLE_PAYLOAD";
private static final String COLUMN_FLE_CHANNEL = "FLE_CHANNEL";
private static final String COLUMN_FLE_SPILL = "FLE_SPILL";
+ private static final String INDEX_FLE_CHANNEL_NAME = "IDX_FLE_CHANNEL";
+ private static final String INDEX_FLE_CHANNEL = SCHEMA_FLUME + "."
+ + INDEX_FLE_CHANNEL_NAME;
private static final String TABLE_FL_PLSPILL_NAME = "FL_PLSPILL";
private static final String TABLE_FL_PLSPILL = SCHEMA_FLUME + "."
+ TABLE_FL_PLSPILL_NAME;
private static final String COLUMN_FLP_EVENT = "FLP_EVENT";
private static final String COLUMN_FLP_SPILL = "FLP_SPILL";
+ private static final String INDEX_FLP_EVENT_NAME = "IDX_FLP_EVENT";
+ private static final String INDEX_FLP_EVENT = SCHEMA_FLUME + "."
+ + INDEX_FLP_EVENT_NAME;
private static final String TABLE_FL_HEADER_NAME = "FL_HEADER";
private static final String TABLE_FL_HEADER = SCHEMA_FLUME + "."
@@ -156,18 +162,27 @@ public class DerbySchemaHandler implemen
private static final String COLUMN_FLH_VALUE = "FLH_VALUE";
private static final String COLUMN_FLH_NMSPILL = "FLH_NMSPILL";
private static final String COLUMN_FLH_VLSPILL = "FLH_VLSPILL";
+ private static final String INDEX_FLH_EVENT_NAME = "IDX_FLH_EVENT";
+ private static final String INDEX_FLH_EVENT = SCHEMA_FLUME + "."
+ + INDEX_FLH_EVENT_NAME;
private static final String TABLE_FL_NMSPILL_NAME = "FL_NMSPILL";
private static final String TABLE_FL_NMSPILL = SCHEMA_FLUME + "."
+ TABLE_FL_NMSPILL_NAME;
private static final String COLUMN_FLN_HEADER = "FLN_HEADER";
private static final String COLUMN_FLN_SPILL = "FLN_SPILL";
+ private static final String INDEX_FLN_HEADER_NAME = "IDX_FLN_HEADER";
+ private static final String INDEX_FLN_HEADER = SCHEMA_FLUME + "."
+ + INDEX_FLN_HEADER_NAME;
private static final String TABLE_FL_VLSPILL_NAME = "FL_VLSPILL";
private static final String TABLE_FL_VLSPILL = SCHEMA_FLUME + "."
+ TABLE_FL_VLSPILL_NAME;
private static final String COLUMN_FLV_HEADER = "FLV_HEADER";
private static final String COLUMN_FLV_SPILL = "FLV_SPILL";
+ private static final String INDEX_FLV_HEADER_NAME = "IDX_FLV_HEADER";
+ private static final String INDEX_FLV_HEADER = SCHEMA_FLUME + "."
+ + INDEX_FLV_HEADER_NAME;
public static final String QUERY_CREATE_SCHEMA_FLUME
= "CREATE SCHEMA " + SCHEMA_FLUME;
@@ -183,6 +198,10 @@ public class DerbySchemaHandler implemen
+ ConfigurationConstants.CHANNEL_NAME_MAX_LENGTH + "), "
+ COLUMN_FLE_SPILL + " BOOLEAN)";
+ public static final String QUERY_CREATE_INDEX_FLE_CHANNEL
+ = "CREATE INDEX " + INDEX_FLE_CHANNEL + " ON " + TABLE_FL_EVENT
+ + " (" + COLUMN_FLE_CHANNEL + ")";
+
public static final String QUERY_CREATE_TABLE_FL_PLSPILL
= "CREATE TABLE " + TABLE_FL_PLSPILL + " ( "
+ COLUMN_FLP_EVENT + " BIGINT, "
@@ -190,6 +209,10 @@ public class DerbySchemaHandler implemen
+ "FOREIGN KEY (" + COLUMN_FLP_EVENT + ") REFERENCES "
+ TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + "))";
+ public static final String QUERY_CREATE_INDEX_FLP_EVENT
+ = "CREATE INDEX " + INDEX_FLP_EVENT + " ON " + TABLE_FL_PLSPILL
+ + " (" + COLUMN_FLP_EVENT + ")";
+
public static final String QUERY_CREATE_TABLE_FL_HEADER
= "CREATE TABLE " + TABLE_FL_HEADER + " ( "
+ COLUMN_FLH_ID + " BIGINT GENERATED ALWAYS AS IDENTITY "
@@ -204,6 +227,10 @@ public class DerbySchemaHandler implemen
+ "FOREIGN KEY (" + COLUMN_FLH_EVENT + ") REFERENCES "
+ TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + "))";
+ public static final String QUERY_CREATE_INDEX_FLH_EVENT
+ = "CREATE INDEX " + INDEX_FLH_EVENT + " ON " + TABLE_FL_HEADER
+ + " (" + COLUMN_FLH_EVENT + ")";
+
public static final String QUERY_CREATE_TABLE_FL_NMSPILL
= "CREATE TABLE " + TABLE_FL_NMSPILL + " ( "
+ COLUMN_FLN_HEADER + " BIGINT, "
@@ -212,6 +239,10 @@ public class DerbySchemaHandler implemen
+ "FOREIGN KEY (" + COLUMN_FLN_HEADER + ") REFERENCES "
+ TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + "))";
+ public static final String QUERY_CREATE_INDEX_FLN_HEADER
+ = "CREATE INDEX " + INDEX_FLN_HEADER + " ON " + TABLE_FL_NMSPILL
+ + " (" + COLUMN_FLN_HEADER + ")";
+
public static final String QUERY_CREATE_TABLE_FL_VLSPILL
= "CREATE TABLE " + TABLE_FL_VLSPILL + " ( "
+ COLUMN_FLV_HEADER + " BIGINT, "
@@ -220,6 +251,10 @@ public class DerbySchemaHandler implemen
+ "FOREIGN KEY (" + COLUMN_FLV_HEADER + ") REFERENCES "
+ TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + "))";
+ public static final String QUERY_CREATE_INDEX_FLV_HEADER
+ = "CREATE INDEX " + INDEX_FLV_HEADER + " ON " + TABLE_FL_VLSPILL
+ + " (" + COLUMN_FLV_HEADER + ")";
+
public static final String COLUMN_LOOKUP_QUERY
= "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
+ "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
@@ -344,13 +379,21 @@ public class DerbySchemaHandler implemen
}
@Override
- public void createSchemaObjects() {
+ public void createSchemaObjects(boolean createIndex) {
runQuery(QUERY_CREATE_SCHEMA_FLUME);
runQuery(QUERY_CREATE_TABLE_FL_EVENT);
runQuery(QUERY_CREATE_TABLE_FL_PLSPILL);
runQuery(QUERY_CREATE_TABLE_FL_HEADER);
runQuery(QUERY_CREATE_TABLE_FL_NMSPILL);
runQuery(QUERY_CREATE_TABLE_FL_VLSPILL);
+
+ if (createIndex) {
+ runQuery(QUERY_CREATE_INDEX_FLE_CHANNEL);
+ runQuery(QUERY_CREATE_INDEX_FLH_EVENT);
+ runQuery(QUERY_CREATE_INDEX_FLP_EVENT);
+ runQuery(QUERY_CREATE_INDEX_FLN_HEADER);
+ runQuery(QUERY_CREATE_INDEX_FLV_HEADER);
+ }
}
@Override
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Wed Nov 30 18:17:30 2011
@@ -112,8 +112,16 @@ public class JdbcChannelProviderImpl imp
+ "schema and try again.");
}
+ String createIndexFlag = context.getString(
+ ConfigurationConstants.CONFIG_CREATE_INDEX, "true");
+
+ boolean createIndex = Boolean.valueOf(createIndexFlag);
+ if (!createIndex) {
+ LOGGER.info("Index creation is disabled, indexes will not be created.");
+ }
+
// Now create schema
- schemaHandler.createSchemaObjects();
+ schemaHandler.createSchemaObjects(createIndex);
}
// Validate all schema objects are as expected
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java Wed Nov 30 18:17:30 2011
@@ -43,7 +43,7 @@ public class MySQLSchemaHandler implemen
}
@Override
- public void createSchemaObjects() {
+ public void createSchemaObjects(boolean createIndex) {
// TODO Auto-generated method stub
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java Wed Nov 30 18:17:30 2011
@@ -39,9 +39,10 @@ public interface SchemaHandler {
/**
* Creates the schema.
- * @param connection the connection to create schema for.
+ * @param createIndex a flag which indicates if indexes must be created during
+ * the creation of the schema.
*/
- public void createSchemaObjects();
+ public void createSchemaObjects(boolean createIndex);
/**
* Inserts the given persistent event into the database. The connection that
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java Wed Nov 30 18:17:30 2011
@@ -32,10 +32,16 @@ public class TestDerbySchemaHandlerQueri
+ "VARCHAR(16384) FOR BIT DATA, FLE_CHANNEL VARCHAR(64), "
+ "FLE_SPILL BOOLEAN)";
+ public static final String EXPECTED_QUERY_CREATE_INDEX_FLE_CHANNEL
+ = "CREATE INDEX FLUME.IDX_FLE_CHANNEL ON FLUME.FL_EVENT (FLE_CHANNEL)";
+
public static final String EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL
= "CREATE TABLE FLUME.FL_PLSPILL ( FLP_EVENT BIGINT, FLP_SPILL BLOB, "
+ "FOREIGN KEY (FLP_EVENT) REFERENCES FLUME.FL_EVENT (FLE_ID))";
+ public static final String EXPECTED_QUERY_CREATE_INDEX_FLP_EVENT
+ = "CREATE INDEX FLUME.IDX_FLP_EVENT ON FLUME.FL_PLSPILL (FLP_EVENT)";
+
public static final String EXPECTED_QUERY_CREATE_TABLE_FL_HEADER
= "CREATE TABLE FLUME.FL_HEADER ( FLH_ID BIGINT GENERATED ALWAYS AS "
+ "IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
@@ -44,16 +50,25 @@ public class TestDerbySchemaHandlerQueri
+ "FLH_VLSPILL BOOLEAN, FOREIGN KEY (FLH_EVENT) "
+ "REFERENCES FLUME.FL_EVENT (FLE_ID))";
+ public static final String EXPECTED_QUERY_CREATE_INDEX_FLH_EVENT
+ = "CREATE INDEX FLUME.IDX_FLH_EVENT ON FLUME.FL_HEADER (FLH_EVENT)";
+
public static final String EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL
= "CREATE TABLE FLUME.FL_NMSPILL ( FLN_HEADER BIGINT, FLN_SPILL "
+ "VARCHAR(32517), FOREIGN KEY (FLN_HEADER) REFERENCES "
+ "FLUME.FL_HEADER (FLH_ID))";
+ public static final String EXPECTED_QUERY_CREATE_INDEX_FLN_HEADER
+ = "CREATE INDEX FLUME.IDX_FLN_HEADER ON FLUME.FL_NMSPILL (FLN_HEADER)";
+
public static final String EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL
= "CREATE TABLE FLUME.FL_VLSPILL ( FLV_HEADER BIGINT, FLV_SPILL "
+ "VARCHAR(32517), FOREIGN KEY (FLV_HEADER) REFERENCES "
+ "FLUME.FL_HEADER (FLH_ID))";
+ public static final String EXPECTED_QUERY_CREATE_INDEX_FLV_HEADER
+ = "CREATE INDEX FLUME.IDX_FLV_HEADER ON FLUME.FL_VLSPILL (FLV_HEADER)";
+
public static final String EXPECTED_COLUMN_LOOKUP_QUERY
= "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
+ "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
@@ -122,18 +137,32 @@ public class TestDerbySchemaHandlerQueri
Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_EVENT,
EXPECTED_QUERY_CREATE_TABLE_FL_EVENT);
+ Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLE_CHANNEL,
+ EXPECTED_QUERY_CREATE_INDEX_FLE_CHANNEL);
+
Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_PLSPILL,
EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL);
+ Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLP_EVENT,
+ EXPECTED_QUERY_CREATE_INDEX_FLP_EVENT);
+
Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_HEADER,
EXPECTED_QUERY_CREATE_TABLE_FL_HEADER);
+ Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLH_EVENT,
+ EXPECTED_QUERY_CREATE_INDEX_FLH_EVENT);
+
Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_NMSPILL,
EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL);
+ Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLN_HEADER,
+ EXPECTED_QUERY_CREATE_INDEX_FLN_HEADER);
+
Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_VLSPILL,
EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL);
+ Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLV_HEADER,
+ EXPECTED_QUERY_CREATE_INDEX_FLV_HEADER);
Assert.assertEquals(DerbySchemaHandler.COLUMN_LOOKUP_QUERY,
EXPECTED_COLUMN_LOOKUP_QUERY);