You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/28 23:51:02 UTC
svn commit: r1190609 - in /incubator/flume/branches/flume-728:
flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/
flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/
flume-ng-channels/fl...
Author: arvind
Date: Fri Oct 28 21:51:02 2011
New Revision: 1190609
URL: http://svn.apache.org/viewvc?rev=1190609&view=rev
Log:
FLUME-817. JdbcChannel cannot be created by DefaultChannelfactory.
Modified:
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java Fri Oct 28 21:51:02 2011
@@ -17,33 +17,28 @@
*/
package org.apache.flume.channel.jdbc;
-import java.util.Properties;
-
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
import org.apache.log4j.Logger;
/**
* <p>A JDBC based channel implementation.</p>
*/
-public class JdbcChannel implements Channel {
+public class JdbcChannel implements Channel, Configurable {
private static final Logger LOG = Logger.getLogger(JdbcChannel.class);
- private final JdbcChannelProvider provider;
- private final String name;
+ private JdbcChannelProvider provider;
+ private String name;
/**
- * Instantiates a new JDBC Channel with the given properties.
- * @param configuration
+ * Instantiates a new JDBC Channel.
*/
- public JdbcChannel(String name, Properties configuration) {
- provider = JdbcChannelProviderFactory.getProvider(configuration);
- this.name = name;
-
- LOG.info("JDBC Channel initialized: " + name);
+ public JdbcChannel() {
}
@Override
@@ -63,7 +58,9 @@ public class JdbcChannel implements Chan
@Override
public void shutdown() {
- // TODO Auto-generated method stub
+ JdbcChannelProviderFactory.releaseProvider(name);
+ provider = null;
+ name = null;
}
@Override
@@ -74,4 +71,14 @@ public class JdbcChannel implements Chan
private JdbcChannelProvider getProvider() {
return provider;
}
+
+ @Override
+ public void configure(Context context) {
+ // FIXME - allow name to be specified via the context
+ this.name = "jdbc";
+
+ provider = JdbcChannelProviderFactory.getProvider(context, name);
+
+ LOG.info("JDBC Channel initialized: " + name);
+ }
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java Fri Oct 28 21:51:02 2011
@@ -17,8 +17,7 @@
*/
package org.apache.flume.channel.jdbc;
-import java.util.Properties;
-
+import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
@@ -32,7 +31,7 @@ public interface JdbcChannelProvider {
* the channel can be used in any way.
* @param properties the configuration for the system
*/
- public void initialize(Properties properties);
+ public void initialize(Context context);
/**
* Deinitializes the channel provider. Once this method is called, the
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProviderFactory.java Fri Oct 28 21:51:02 2011
@@ -1,13 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flume.channel.jdbc;
-import java.util.Properties;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flume.Context;
+import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
public final class JdbcChannelProviderFactory {
- public static JdbcChannelProvider getProvider(Properties properties) {
- return null;
+ private static Set<String> INSTANCES = new HashSet<String>();
+ private static JdbcChannelProvider PROVIDER;
+
+ public static synchronized JdbcChannelProvider getProvider(
+ Context context, String name) {
+ if (PROVIDER == null) {
+ PROVIDER = new JdbcChannelProviderImpl();
+ PROVIDER.initialize(context);
+ }
+
+ if (!INSTANCES.add(name)) {
+ throw new JdbcChannelException("Attempt to initialize multiple "
+ + "channels with same name: " + name);
+ }
+
+ return PROVIDER;
}
+ public static synchronized void releaseProvider(String name) {
+ if (!INSTANCES.remove(name)) {
+ throw new JdbcChannelException("Attempt to release non-existant channel: "
+ + name);
+ }
+
+ if (INSTANCES.size() == 0) {
+ // Deinitialize the provider
+ PROVIDER.close();
+ PROVIDER = null;
+ }
+ }
private JdbcChannelProviderFactory() {
// disable explicit object creation
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Fri Oct 28 21:51:02 2011
@@ -35,8 +35,8 @@ import org.apache.commons.dbcp.PoolingDa
import org.apache.commons.pool.KeyedObjectPoolFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPoolFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.flume.Context;
import org.apache.flume.Event;
-import org.apache.flume.Transaction;
import org.apache.flume.channel.jdbc.ConfigurationConstants;
import org.apache.flume.channel.jdbc.DatabaseType;
import org.apache.flume.channel.jdbc.JdbcChannelException;
@@ -85,18 +85,18 @@ public class JdbcChannelProviderImpl imp
private String driverClassName;
@Override
- public void initialize(Properties properties) {
+ public void initialize(Context context) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Initializing JDBC Channel provider with props: "
- + properties);
+ + context);
}
- initializeDataSource(properties);
- initializeSchema(properties);
+ initializeDataSource(context);
+ initializeSchema(context);
}
- private void initializeSchema(Properties properties) {
- String createSchemaFlag = properties.getProperty(
+ private void initializeSchema(Context context) {
+ String createSchemaFlag = context.getString(
ConfigurationConstants.CONFIG_CREATE_SCHEMA, "true");
boolean createSchema = Boolean.valueOf(createSchemaFlag);
@@ -225,23 +225,21 @@ public class JdbcChannelProviderImpl imp
* Initializes the datasource and the underlying connection pool.
* @param properties
*/
- private void initializeDataSource(Properties properties) {
- driverClassName = properties.getProperty(
+ private void initializeDataSource(Context context) {
+ driverClassName = context.getString(
ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS);
- connectUrl = properties.getProperty(ConfigurationConstants.CONFIG_URL);
+ connectUrl = context.getString(ConfigurationConstants.CONFIG_URL);
- String userName =
- properties.getProperty(ConfigurationConstants.CONFIG_USERNAME);
+ String userName = context.getString(ConfigurationConstants.CONFIG_USERNAME);
- String password =
- properties.getProperty(ConfigurationConstants.CONFIG_PASSWORD);
+ String password = context.getString(ConfigurationConstants.CONFIG_PASSWORD);
- String jdbcPropertiesFile = properties.getProperty(
+ String jdbcPropertiesFile = context.getString(
ConfigurationConstants.CONFIG_JDBC_PROPERTIES_FILE);
- String dbTypeName = properties.getProperty(
+ String dbTypeName = context.getString(
ConfigurationConstants.CONFIG_DATABASE_TYPE);
// If connect URL is not specified, use embedded Derby
@@ -379,7 +377,7 @@ public class JdbcChannelProviderImpl imp
}
// Transaction Isolation
- String txIsolation = properties.getProperty(
+ String txIsolation = context.getString(
ConfigurationConstants.CONFIG_TX_ISOLATION_LEVEL,
TransactionIsolation.READ_COMMITTED.getName());
@@ -394,7 +392,7 @@ public class JdbcChannelProviderImpl imp
connectionPool = new GenericObjectPool();
- String maxActiveConnections = properties.getProperty(
+ String maxActiveConnections = context.getString(
ConfigurationConstants.CONFIG_MAX_CONNECTION, "10");
int maxActive = 10;
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Fri Oct 28 21:51:02 2011
@@ -32,8 +32,8 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
@@ -49,20 +49,20 @@ public class TestJdbcChannelProvider {
private static final Logger LOGGER =
LoggerFactory.getLogger(TestJdbcChannelProvider.class);
- private Properties derbyProps = new Properties();
+ private Context derbyCtx = new Context();
private File derbyDbDir;
private JdbcChannelProviderImpl provider;
@Before
public void setUp() throws IOException {
- derbyProps.clear();
- derbyProps.put(ConfigurationConstants.CONFIG_CREATE_SCHEMA, "true");
- derbyProps.put(ConfigurationConstants.CONFIG_DATABASE_TYPE, "DERBY");
- derbyProps.put(ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS,
+ derbyCtx.clear();
+ derbyCtx.put(ConfigurationConstants.CONFIG_CREATE_SCHEMA, "true");
+ derbyCtx.put(ConfigurationConstants.CONFIG_DATABASE_TYPE, "DERBY");
+ derbyCtx.put(ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS,
"org.apache.derby.jdbc.EmbeddedDriver");
- derbyProps.put(ConfigurationConstants.CONFIG_PASSWORD, "");
- derbyProps.put(ConfigurationConstants.CONFIG_USERNAME, "sa");
+ derbyCtx.put(ConfigurationConstants.CONFIG_PASSWORD, "");
+ derbyCtx.put(ConfigurationConstants.CONFIG_USERNAME, "sa");
File tmpDir = new File("target/test");
tmpDir.mkdirs();
@@ -78,17 +78,17 @@ public class TestJdbcChannelProvider {
derbyDbDir.mkdirs();
}
- derbyProps.put(ConfigurationConstants.CONFIG_URL,
+ derbyCtx.put(ConfigurationConstants.CONFIG_URL,
"jdbc:derby:" + derbyDbDir.getCanonicalPath() + "/db;create=true");
- LOGGER.info("Derby Properties: " + derbyProps);
+ LOGGER.info("Derby Properties: " + derbyCtx);
}
@Test
public void testDerbySetup() {
provider = new JdbcChannelProviderImpl();
- provider.initialize(derbyProps);
+ provider.initialize(derbyCtx);
Transaction tx1 = provider.getTransaction();
tx1.begin();
@@ -117,7 +117,7 @@ public class TestJdbcChannelProvider {
@Test
public void testEventWithSimulatedSourceAndSinks() throws Exception {
provider = new JdbcChannelProviderImpl();
- provider.initialize(derbyProps);
+ provider.initialize(derbyCtx);
Map<String, List<MockEvent>> eventMap =
new HashMap<String, List<MockEvent>>();
@@ -173,7 +173,7 @@ public class TestJdbcChannelProvider {
@Test
public void testPeristingEvents() {
provider = new JdbcChannelProviderImpl();
- provider.initialize(derbyProps);
+ provider.initialize(derbyCtx);
Map<String, List<MockEvent>> eventMap =
new HashMap<String, List<MockEvent>>();
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java?rev=1190609&r1=1190608&r2=1190609&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java Fri Oct 28 21:51:02 2011
@@ -23,6 +23,23 @@ public class Context {
return null;
}
+ public <T> T get(String key, Class<? extends T> clazz, T defaultValue) {
+ T result = get(key, clazz);
+ if (result == null) {
+ result = defaultValue;
+ }
+
+ return result;
+ }
+
+ public String getString(String key) {
+ return get(key, String.class);
+ }
+
+ public String getString(String key, String defaultValue) {
+ return get(key, String.class, defaultValue);
+ }
+
@Override
public String toString() {
return "{ parameters:" + parameters + " }";
@@ -36,4 +53,7 @@ public class Context {
this.parameters = parameters;
}
+ public void clear() {
+ parameters.clear();
+ }
}