You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/11/30 19:17:31 UTC

svn commit: r1208624 - in /incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src: main/java/org/apache/flume/channel/jdbc/ main/java/org/apache/flume/channel/jdbc/impl/ test/java/org/apache/flume/channel/jdbc/

Author: arvind
Date: Wed Nov 30 18:17:30 2011
New Revision: 1208624

URL: http://svn.apache.org/viewvc?rev=1208624&view=rev
Log:
FLUME-821. Support for creating indexes in JDBC channel.

Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java Wed Nov 30 18:17:30 2011
@@ -45,6 +45,9 @@ public final class ConfigurationConstant
   public static final String CONFIG_CREATE_SCHEMA =
       PREFIX + "create.schema";
 
+  public static final String CONFIG_CREATE_INDEX =
+      PREFIX + "create.index";
+
   public static final String CONFIG_TX_ISOLATION_LEVEL =
       PREFIX + "transaction.isolation";
 

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java Wed Nov 30 18:17:30 2011
@@ -140,12 +140,18 @@ public class DerbySchemaHandler implemen
   private static final String COLUMN_FLE_PAYLOAD = "FLE_PAYLOAD";
   private static final String COLUMN_FLE_CHANNEL = "FLE_CHANNEL";
   private static final String COLUMN_FLE_SPILL = "FLE_SPILL";
+  private static final String INDEX_FLE_CHANNEL_NAME = "IDX_FLE_CHANNEL";
+  private static final String INDEX_FLE_CHANNEL = SCHEMA_FLUME + "."
+      + INDEX_FLE_CHANNEL_NAME;
 
   private static final String TABLE_FL_PLSPILL_NAME = "FL_PLSPILL";
   private static final String TABLE_FL_PLSPILL = SCHEMA_FLUME + "."
       + TABLE_FL_PLSPILL_NAME;
   private static final String COLUMN_FLP_EVENT = "FLP_EVENT";
   private static final String COLUMN_FLP_SPILL = "FLP_SPILL";
+  private static final String INDEX_FLP_EVENT_NAME = "IDX_FLP_EVENT";
+  private static final String INDEX_FLP_EVENT = SCHEMA_FLUME + "."
+      + INDEX_FLP_EVENT_NAME;
 
   private static final String TABLE_FL_HEADER_NAME = "FL_HEADER";
   private static final String TABLE_FL_HEADER = SCHEMA_FLUME + "."
@@ -156,18 +162,27 @@ public class DerbySchemaHandler implemen
   private static final String COLUMN_FLH_VALUE = "FLH_VALUE";
   private static final String COLUMN_FLH_NMSPILL = "FLH_NMSPILL";
   private static final String COLUMN_FLH_VLSPILL = "FLH_VLSPILL";
+  private static final String INDEX_FLH_EVENT_NAME = "IDX_FLH_EVENT";
+  private static final String INDEX_FLH_EVENT = SCHEMA_FLUME + "."
+      + INDEX_FLH_EVENT_NAME;
 
   private static final String TABLE_FL_NMSPILL_NAME = "FL_NMSPILL";
   private static final String TABLE_FL_NMSPILL = SCHEMA_FLUME + "."
       + TABLE_FL_NMSPILL_NAME;
   private static final String COLUMN_FLN_HEADER = "FLN_HEADER";
   private static final String COLUMN_FLN_SPILL = "FLN_SPILL";
+  private static final String INDEX_FLN_HEADER_NAME = "IDX_FLN_HEADER";
+  private static final String INDEX_FLN_HEADER = SCHEMA_FLUME + "."
+      + INDEX_FLN_HEADER_NAME;
 
   private static final String TABLE_FL_VLSPILL_NAME = "FL_VLSPILL";
   private static final String TABLE_FL_VLSPILL = SCHEMA_FLUME + "."
       + TABLE_FL_VLSPILL_NAME;
   private static final String COLUMN_FLV_HEADER = "FLV_HEADER";
   private static final String COLUMN_FLV_SPILL = "FLV_SPILL";
+  private static final String INDEX_FLV_HEADER_NAME = "IDX_FLV_HEADER";
+  private static final String INDEX_FLV_HEADER = SCHEMA_FLUME + "."
+      + INDEX_FLV_HEADER_NAME;
 
   public static final String QUERY_CREATE_SCHEMA_FLUME
       = "CREATE SCHEMA " + SCHEMA_FLUME;
@@ -183,6 +198,10 @@ public class DerbySchemaHandler implemen
         + ConfigurationConstants.CHANNEL_NAME_MAX_LENGTH + "), "
         + COLUMN_FLE_SPILL + " BOOLEAN)";
 
+  public static final String QUERY_CREATE_INDEX_FLE_CHANNEL
+      = "CREATE INDEX " + INDEX_FLE_CHANNEL + " ON " + TABLE_FL_EVENT
+        + " (" + COLUMN_FLE_CHANNEL + ")";
+
   public static final String QUERY_CREATE_TABLE_FL_PLSPILL
       = "CREATE TABLE " + TABLE_FL_PLSPILL + " ( "
         + COLUMN_FLP_EVENT + " BIGINT, "
@@ -190,6 +209,10 @@ public class DerbySchemaHandler implemen
         + "FOREIGN KEY (" + COLUMN_FLP_EVENT + ") REFERENCES "
         + TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + "))";
 
+  public static final String QUERY_CREATE_INDEX_FLP_EVENT
+      = "CREATE INDEX " + INDEX_FLP_EVENT + " ON " + TABLE_FL_PLSPILL
+        + " (" + COLUMN_FLP_EVENT + ")";
+
   public static final String QUERY_CREATE_TABLE_FL_HEADER
       = "CREATE TABLE " + TABLE_FL_HEADER + " ( "
         + COLUMN_FLH_ID + " BIGINT GENERATED ALWAYS AS IDENTITY "
@@ -204,6 +227,10 @@ public class DerbySchemaHandler implemen
         + "FOREIGN KEY (" + COLUMN_FLH_EVENT + ") REFERENCES "
         + TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + "))";
 
+  public static final String QUERY_CREATE_INDEX_FLH_EVENT
+      = "CREATE INDEX " + INDEX_FLH_EVENT + " ON " + TABLE_FL_HEADER
+         + " (" + COLUMN_FLH_EVENT + ")";
+
   public static final String QUERY_CREATE_TABLE_FL_NMSPILL
       = "CREATE TABLE " + TABLE_FL_NMSPILL + " ( "
         + COLUMN_FLN_HEADER + " BIGINT, "
@@ -212,6 +239,10 @@ public class DerbySchemaHandler implemen
         + "FOREIGN KEY (" + COLUMN_FLN_HEADER + ") REFERENCES "
         + TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + "))";
 
+  public static final String QUERY_CREATE_INDEX_FLN_HEADER
+      = "CREATE INDEX " + INDEX_FLN_HEADER + " ON " + TABLE_FL_NMSPILL
+         + " (" + COLUMN_FLN_HEADER + ")";
+
   public static final String QUERY_CREATE_TABLE_FL_VLSPILL
       = "CREATE TABLE " + TABLE_FL_VLSPILL + " ( "
         + COLUMN_FLV_HEADER + " BIGINT, "
@@ -220,6 +251,10 @@ public class DerbySchemaHandler implemen
         + "FOREIGN KEY (" + COLUMN_FLV_HEADER + ") REFERENCES "
         + TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + "))";
 
+  public static final String QUERY_CREATE_INDEX_FLV_HEADER
+      = "CREATE INDEX " + INDEX_FLV_HEADER + " ON " + TABLE_FL_VLSPILL
+         + " (" + COLUMN_FLV_HEADER + ")";
+
   public static final String COLUMN_LOOKUP_QUERY
       = "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
          + "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
@@ -344,13 +379,21 @@ public class DerbySchemaHandler implemen
   }
 
   @Override
-  public void createSchemaObjects() {
+  public void createSchemaObjects(boolean createIndex) {
     runQuery(QUERY_CREATE_SCHEMA_FLUME);
     runQuery(QUERY_CREATE_TABLE_FL_EVENT);
     runQuery(QUERY_CREATE_TABLE_FL_PLSPILL);
     runQuery(QUERY_CREATE_TABLE_FL_HEADER);
     runQuery(QUERY_CREATE_TABLE_FL_NMSPILL);
     runQuery(QUERY_CREATE_TABLE_FL_VLSPILL);
+
+    if (createIndex) {
+      runQuery(QUERY_CREATE_INDEX_FLE_CHANNEL);
+      runQuery(QUERY_CREATE_INDEX_FLH_EVENT);
+      runQuery(QUERY_CREATE_INDEX_FLP_EVENT);
+      runQuery(QUERY_CREATE_INDEX_FLN_HEADER);
+      runQuery(QUERY_CREATE_INDEX_FLV_HEADER);
+    }
   }
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Wed Nov 30 18:17:30 2011
@@ -112,8 +112,16 @@ public class JdbcChannelProviderImpl imp
             + "schema and try again.");
       }
 
+      String createIndexFlag = context.getString(
+          ConfigurationConstants.CONFIG_CREATE_INDEX, "true");
+
+      boolean createIndex = Boolean.valueOf(createIndexFlag);
+      if (!createIndex) {
+        LOGGER.info("Index creation is disabled, indexes will not be created.");
+      }
+
       // Now create schema
-      schemaHandler.createSchemaObjects();
+      schemaHandler.createSchemaObjects(createIndex);
     }
 
     // Validate all schema objects are as expected

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java Wed Nov 30 18:17:30 2011
@@ -43,7 +43,7 @@ public class MySQLSchemaHandler implemen
   }
 
   @Override
-  public void createSchemaObjects() {
+  public void createSchemaObjects(boolean createIndex) {
     // TODO Auto-generated method stub
 
   }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java Wed Nov 30 18:17:30 2011
@@ -39,9 +39,10 @@ public interface SchemaHandler {
 
   /**
    * Creates the schema.
-   * @param connection the connection to create schema for.
+   * @param createIndex a flag which indicates if indexes must be created during
+   *        the creation of the schema.
    */
-  public void createSchemaObjects();
+  public void createSchemaObjects(boolean createIndex);
 
   /**
    * Inserts the given persistent event into the database. The connection that

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java?rev=1208624&r1=1208623&r2=1208624&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java Wed Nov 30 18:17:30 2011
@@ -32,10 +32,16 @@ public class TestDerbySchemaHandlerQueri
            + "VARCHAR(16384) FOR BIT DATA, FLE_CHANNEL VARCHAR(64), "
            + "FLE_SPILL BOOLEAN)";
 
+  public static final String EXPECTED_QUERY_CREATE_INDEX_FLE_CHANNEL
+       = "CREATE INDEX FLUME.IDX_FLE_CHANNEL ON FLUME.FL_EVENT (FLE_CHANNEL)";
+
   public static final String EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL
        = "CREATE TABLE FLUME.FL_PLSPILL ( FLP_EVENT BIGINT, FLP_SPILL BLOB, "
            + "FOREIGN KEY (FLP_EVENT) REFERENCES FLUME.FL_EVENT (FLE_ID))";
 
+  public static final String EXPECTED_QUERY_CREATE_INDEX_FLP_EVENT
+       = "CREATE INDEX FLUME.IDX_FLP_EVENT ON FLUME.FL_PLSPILL (FLP_EVENT)";
+
   public static final String EXPECTED_QUERY_CREATE_TABLE_FL_HEADER
        = "CREATE TABLE FLUME.FL_HEADER ( FLH_ID BIGINT GENERATED ALWAYS AS "
            + "IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
@@ -44,16 +50,25 @@ public class TestDerbySchemaHandlerQueri
            + "FLH_VLSPILL BOOLEAN, FOREIGN KEY (FLH_EVENT) "
            + "REFERENCES FLUME.FL_EVENT (FLE_ID))";
 
+  public static final String EXPECTED_QUERY_CREATE_INDEX_FLH_EVENT
+       = "CREATE INDEX FLUME.IDX_FLH_EVENT ON FLUME.FL_HEADER (FLH_EVENT)";
+
   public static final String EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL
        = "CREATE TABLE FLUME.FL_NMSPILL ( FLN_HEADER BIGINT, FLN_SPILL "
            + "VARCHAR(32517), FOREIGN KEY (FLN_HEADER) REFERENCES "
            + "FLUME.FL_HEADER (FLH_ID))";
 
+  public static final String EXPECTED_QUERY_CREATE_INDEX_FLN_HEADER
+       = "CREATE INDEX FLUME.IDX_FLN_HEADER ON FLUME.FL_NMSPILL (FLN_HEADER)";
+
   public static final String EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL
        = "CREATE TABLE FLUME.FL_VLSPILL ( FLV_HEADER BIGINT, FLV_SPILL "
             + "VARCHAR(32517), FOREIGN KEY (FLV_HEADER) REFERENCES "
             + "FLUME.FL_HEADER (FLH_ID))";
 
+  public static final String EXPECTED_QUERY_CREATE_INDEX_FLV_HEADER
+       = "CREATE INDEX FLUME.IDX_FLV_HEADER ON FLUME.FL_VLSPILL (FLV_HEADER)";
+
   public static final String EXPECTED_COLUMN_LOOKUP_QUERY
       = "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
           + "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
@@ -122,18 +137,32 @@ public class TestDerbySchemaHandlerQueri
     Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_EVENT,
         EXPECTED_QUERY_CREATE_TABLE_FL_EVENT);
 
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLE_CHANNEL,
+        EXPECTED_QUERY_CREATE_INDEX_FLE_CHANNEL);
+
     Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_PLSPILL,
         EXPECTED_QUERY_CREATE_TABLE_FL_PLSPILL);
 
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLP_EVENT,
+        EXPECTED_QUERY_CREATE_INDEX_FLP_EVENT);
+
     Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_HEADER,
         EXPECTED_QUERY_CREATE_TABLE_FL_HEADER);
 
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLH_EVENT,
+        EXPECTED_QUERY_CREATE_INDEX_FLH_EVENT);
+
     Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_NMSPILL,
         EXPECTED_QUERY_CREATE_TABLE_FL_NMSPILL);
 
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLN_HEADER,
+        EXPECTED_QUERY_CREATE_INDEX_FLN_HEADER);
+
     Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_VLSPILL,
         EXPECTED_QUERY_CREATE_TABLE_FL_VLSPILL);
 
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_INDEX_FLV_HEADER,
+        EXPECTED_QUERY_CREATE_INDEX_FLV_HEADER);
 
     Assert.assertEquals(DerbySchemaHandler.COLUMN_LOOKUP_QUERY,
         EXPECTED_COLUMN_LOOKUP_QUERY);