You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/28 23:51:02 UTC

svn commit: r1190609 - in /incubator/flume/branches/flume-728: flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/ flume-ng-channels/fl...

Author: arvind
Date: Fri Oct 28 21:51:02 2011
New Revision: 1190609

URL: http://svn.apache.org/viewvc?rev=1190609&view=rev
Log:
FLUME-817. JdbcChannel cannot be created by DefaultChannelfactory.

Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java Fri Oct 28 21:51:02 2011
@@ -17,33 +17,28 @@
  */
 package org.apache.flume.channel.jdbc;
 
-import java.util.Properties;
-
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
 import org.apache.log4j.Logger;
 
 /**
  * <p>A JDBC based channel implementation.</p>
  */
-public class JdbcChannel implements Channel {
+public class JdbcChannel implements Channel, Configurable {
 
   private static final Logger LOG = Logger.getLogger(JdbcChannel.class);
 
-  private final JdbcChannelProvider provider;
-  private final String name;
+  private JdbcChannelProvider provider;
+  private String name;
 
   /**
-   * Instantiates a new JDBC Channel with the given properties.
-   * @param configuration
+   * Instantiates a new JDBC Channel.
    */
-  public JdbcChannel(String name, Properties configuration) {
-    provider = JdbcChannelProviderFactory.getProvider(configuration);
-    this.name = name;
-
-    LOG.info("JDBC Channel initialized: " + name);
+  public JdbcChannel() {
   }
 
   @Override
@@ -63,7 +58,9 @@ public class JdbcChannel implements Chan
 
   @Override
   public void shutdown() {
-    // TODO Auto-generated method stub
+    JdbcChannelProviderFactory.releaseProvider(name);
+    provider = null;
+    name = null;
   }
 
   @Override
@@ -74,4 +71,14 @@ public class JdbcChannel implements Chan
   private JdbcChannelProvider getProvider() {
     return provider;
   }
+
+  @Override
+  public void configure(Context context) {
+    // FIXME - allow name to be specified via the context
+    this.name = "jdbc";
+
+    provider = JdbcChannelProviderFactory.getProvider(context, name);
+
+    LOG.info("JDBC Channel initialized: " + name);
+  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java Fri Oct 28 21:51:02 2011
@@ -17,8 +17,7 @@
  */
 package org.apache.flume.channel.jdbc;
 
-import java.util.Properties;
-
+import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 
@@ -32,7 +31,7 @@ public interface JdbcChannelProvider {
    * the channel can be used in any way.
    * @param properties the configuration for the system
    */
-  public void initialize(Properties properties);
+  public void initialize(Context context);
 
   /**
    * Deinitializes the channel provider. Once this method is called, the

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java Fri Oct 28 21:51:02 2011
@@ -1,13 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flume.channel.jdbc;
 
-import java.util.Properties;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flume.Context;
+import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
 
 public final class JdbcChannelProviderFactory {
 
-  public static JdbcChannelProvider getProvider(Properties properties) {
-    return null;
+  private static Set<String> INSTANCES = new HashSet<String>();
+  private static JdbcChannelProvider PROVIDER;
+
+  public static synchronized JdbcChannelProvider getProvider(
+      Context context, String name) {
+    if (PROVIDER == null) {
+      PROVIDER = new JdbcChannelProviderImpl();
+      PROVIDER.initialize(context);
+    }
+
+    if (!INSTANCES.add(name)) {
+      throw new JdbcChannelException("Attempt to initialize multiple "
+           + "channels with same name: " + name);
+    }
+
+    return PROVIDER;
   }
 
+  public static synchronized void releaseProvider(String name) {
+    if (!INSTANCES.remove(name)) {
+      throw new JdbcChannelException("Attempt to release non-existant channel: "
+          + name);
+    }
+
+    if (INSTANCES.size() == 0) {
+      // Deinitialize the provider
+      PROVIDER.close();
+      PROVIDER = null;
+    }
+  }
 
   private JdbcChannelProviderFactory() {
     // disable explicit object creation

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Fri Oct 28 21:51:02 2011
@@ -35,8 +35,8 @@ import org.apache.commons.dbcp.PoolingDa
 import org.apache.commons.pool.KeyedObjectPoolFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPoolFactory;
 import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.Transaction;
 import org.apache.flume.channel.jdbc.ConfigurationConstants;
 import org.apache.flume.channel.jdbc.DatabaseType;
 import org.apache.flume.channel.jdbc.JdbcChannelException;
@@ -85,18 +85,18 @@ public class JdbcChannelProviderImpl imp
   private String driverClassName;
 
   @Override
-  public void initialize(Properties properties) {
+  public void initialize(Context context) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Initializing JDBC Channel provider with props: "
-          + properties);
+          + context);
     }
 
-    initializeDataSource(properties);
-    initializeSchema(properties);
+    initializeDataSource(context);
+    initializeSchema(context);
   }
 
-  private void initializeSchema(Properties properties) {
-    String createSchemaFlag = properties.getProperty(
+  private void initializeSchema(Context context) {
+    String createSchemaFlag = context.getString(
         ConfigurationConstants.CONFIG_CREATE_SCHEMA, "true");
 
     boolean createSchema = Boolean.valueOf(createSchemaFlag);
@@ -225,23 +225,21 @@ public class JdbcChannelProviderImpl imp
    * Initializes the datasource and the underlying connection pool.
    * @param properties
    */
-  private void initializeDataSource(Properties properties) {
-    driverClassName = properties.getProperty(
+  private void initializeDataSource(Context context) {
+    driverClassName = context.getString(
         ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS);
 
-    connectUrl = properties.getProperty(ConfigurationConstants.CONFIG_URL);
+    connectUrl = context.getString(ConfigurationConstants.CONFIG_URL);
 
 
-    String userName =
-        properties.getProperty(ConfigurationConstants.CONFIG_USERNAME);
+    String userName = context.getString(ConfigurationConstants.CONFIG_USERNAME);
 
-    String password =
-        properties.getProperty(ConfigurationConstants.CONFIG_PASSWORD);
+    String password = context.getString(ConfigurationConstants.CONFIG_PASSWORD);
 
-    String jdbcPropertiesFile = properties.getProperty(
+    String jdbcPropertiesFile = context.getString(
         ConfigurationConstants.CONFIG_JDBC_PROPERTIES_FILE);
 
-    String dbTypeName = properties.getProperty(
+    String dbTypeName = context.getString(
         ConfigurationConstants.CONFIG_DATABASE_TYPE);
 
     // If connect URL is not specified, use embedded Derby
@@ -379,7 +377,7 @@ public class JdbcChannelProviderImpl imp
     }
 
     // Transaction Isolation
-    String txIsolation = properties.getProperty(
+    String txIsolation = context.getString(
         ConfigurationConstants.CONFIG_TX_ISOLATION_LEVEL,
         TransactionIsolation.READ_COMMITTED.getName());
 
@@ -394,7 +392,7 @@ public class JdbcChannelProviderImpl imp
 
     connectionPool = new GenericObjectPool();
 
-    String maxActiveConnections = properties.getProperty(
+    String maxActiveConnections = context.getString(
         ConfigurationConstants.CONFIG_MAX_CONNECTION, "10");
 
     int maxActive = 10;

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Fri Oct 28 21:51:02 2011
@@ -32,8 +32,8 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
@@ -49,20 +49,20 @@ public class TestJdbcChannelProvider {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(TestJdbcChannelProvider.class);
 
-  private Properties derbyProps = new Properties();
+  private Context derbyCtx = new Context();
   private File derbyDbDir;
   private JdbcChannelProviderImpl provider;
 
   @Before
   public void setUp() throws IOException {
-    derbyProps.clear();
-    derbyProps.put(ConfigurationConstants.CONFIG_CREATE_SCHEMA, "true");
-    derbyProps.put(ConfigurationConstants.CONFIG_DATABASE_TYPE, "DERBY");
-    derbyProps.put(ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS,
+    derbyCtx.clear();
+    derbyCtx.put(ConfigurationConstants.CONFIG_CREATE_SCHEMA, "true");
+    derbyCtx.put(ConfigurationConstants.CONFIG_DATABASE_TYPE, "DERBY");
+    derbyCtx.put(ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS,
         "org.apache.derby.jdbc.EmbeddedDriver");
 
-    derbyProps.put(ConfigurationConstants.CONFIG_PASSWORD, "");
-    derbyProps.put(ConfigurationConstants.CONFIG_USERNAME, "sa");
+    derbyCtx.put(ConfigurationConstants.CONFIG_PASSWORD, "");
+    derbyCtx.put(ConfigurationConstants.CONFIG_USERNAME, "sa");
 
     File tmpDir = new File("target/test");
     tmpDir.mkdirs();
@@ -78,17 +78,17 @@ public class TestJdbcChannelProvider {
       derbyDbDir.mkdirs();
     }
 
-    derbyProps.put(ConfigurationConstants.CONFIG_URL,
+    derbyCtx.put(ConfigurationConstants.CONFIG_URL,
         "jdbc:derby:" + derbyDbDir.getCanonicalPath() + "/db;create=true");
 
-    LOGGER.info("Derby Properties: " + derbyProps);
+    LOGGER.info("Derby Properties: " + derbyCtx);
   }
 
   @Test
   public void testDerbySetup() {
     provider = new JdbcChannelProviderImpl();
 
-    provider.initialize(derbyProps);
+    provider.initialize(derbyCtx);
 
     Transaction tx1 = provider.getTransaction();
     tx1.begin();
@@ -117,7 +117,7 @@ public class TestJdbcChannelProvider {
   @Test
   public void testEventWithSimulatedSourceAndSinks() throws Exception {
     provider = new JdbcChannelProviderImpl();
-    provider.initialize(derbyProps);
+    provider.initialize(derbyCtx);
 
     Map<String, List<MockEvent>> eventMap =
         new HashMap<String, List<MockEvent>>();
@@ -173,7 +173,7 @@ public class TestJdbcChannelProvider {
   @Test
   public void testPeristingEvents() {
     provider = new JdbcChannelProviderImpl();
-    provider.initialize(derbyProps);
+    provider.initialize(derbyCtx);
 
     Map<String, List<MockEvent>> eventMap =
         new HashMap<String, List<MockEvent>>();

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java Fri Oct 28 21:51:02 2011
@@ -23,6 +23,23 @@ public class Context {
     return null;
   }
 
+  public <T> T get(String key, Class<? extends T> clazz, T defaultValue) {
+    T result = get(key, clazz);
+    if (result == null) {
+      result = defaultValue;
+    }
+
+    return result;
+  }
+
+  public String getString(String key) {
+    return get(key, String.class);
+  }
+
+  public String getString(String key, String defaultValue) {
+    return get(key, String.class, defaultValue);
+  }
+
   @Override
   public String toString() {
     return "{ parameters:" + parameters + " }";
@@ -36,4 +53,7 @@ public class Context {
     this.parameters = parameters;
   }
 
+  public void clear() {
+    parameters.clear();
+  }
 }