You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/06/30 02:21:34 UTC
[08/11] flume git commit: FLUME-2937. Integrate checkstyle for
non-test classes
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java
index 2c71bc9..2c516f9 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java
@@ -27,44 +27,45 @@ public abstract class CipherProvider {
public abstract Encryptor.Builder<?> newEncryptorBuilder();
public abstract Decryptor.Builder<?> newDecryptorBuilder();
- public static abstract class Encryptor {
+ public abstract static class Encryptor {
public abstract byte[] encrypt(byte[] clearText);
public abstract byte[] getParameters();
public abstract String getCodec();
/** Builder implementations MUST have a no-arg constructor */
- public static abstract class Builder<T extends Encryptor> {
+ public abstract static class Builder<T extends Encryptor> {
protected Key key;
+
public Builder<T> setKey(Key key) {
this.key = Preconditions.checkNotNull(key, "key cannot be null");
return this;
}
+
public abstract T build();
}
-
}
- public static abstract class Decryptor {
+ public abstract static class Decryptor {
public abstract byte[] decrypt(byte[] cipherText);
public abstract String getCodec();
/** Builder implementations MUST have a no-arg constructor */
- public static abstract class Builder<T extends Decryptor> {
+ public abstract static class Builder<T extends Decryptor> {
protected byte[] parameters;
protected Key key;
+
public Builder<T> setKey(Key key) {
this.key = Preconditions.checkNotNull(key, "key cannot be null");
return this;
}
+
public Builder<T> setParameters(byte[] parameters) {
this.parameters = parameters;
return this;
}
+
public abstract T build();
}
-
}
-
-
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java
index ca11f6b..85b0fbb 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java
@@ -33,7 +33,7 @@ public class CipherProviderFactory {
public static CipherProvider.Encryptor getEncrypter(String cipherProviderType,
Key key) {
- if(cipherProviderType == null) {
+ if (cipherProviderType == null) {
return null;
}
CipherProvider provider = getProvider(cipherProviderType);
@@ -41,7 +41,7 @@ public class CipherProviderFactory {
}
public static CipherProvider.Decryptor getDecrypter(String cipherProviderType,
Key key, byte[] parameters) {
- if(cipherProviderType == null) {
+ if (cipherProviderType == null) {
return null;
}
CipherProvider provider = getProvider(cipherProviderType);
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java
index 87834d7..73bc720 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java
@@ -18,7 +18,6 @@
*/
package org.apache.flume.channel.file.encryption;
-
public enum CipherProviderType {
AESCTRNOPADDING(AESCTRNoPaddingProvider.class),
OTHER(null);
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java
index 0155c39..beffd9e 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java
@@ -27,7 +27,6 @@ import org.apache.flume.FlumeException;
public class DecryptionFailureException extends FlumeException {
private static final long serialVersionUID = 6646810195384793646L;
-
public DecryptionFailureException(String msg) {
super(msg);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java
index f961ef9..c96cf0a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java
@@ -55,7 +55,7 @@ public class JCEFileKeyProvider extends KeyProvider {
keyStorePassword = Files.toString(keyStorePasswordFile, Charsets.UTF_8)
.trim().toCharArray();
ks.load(new FileInputStream(keyStoreFile), keyStorePassword);
- } catch(Exception ex) {
+ } catch (Exception ex) {
throw Throwables.propagate(ex);
}
}
@@ -65,14 +65,14 @@ public class JCEFileKeyProvider extends KeyProvider {
String passwordFile = keyStorePasswordFile.getAbsolutePath();
try {
char[] keyPassword = keyStorePassword;
- if(aliasPasswordFileMap.containsKey(alias)) {
+ if (aliasPasswordFileMap.containsKey(alias)) {
File keyPasswordFile = aliasPasswordFileMap.get(alias);
keyPassword = Files.toString(keyPasswordFile,
Charsets.UTF_8).trim().toCharArray();
passwordFile = keyPasswordFile.getAbsolutePath();
}
Key key = ks.getKey(alias, keyPassword);
- if(key == null) {
+ if (key == null) {
throw new IllegalStateException("KeyStore returned null for " + alias);
}
return key;
@@ -99,13 +99,13 @@ public class JCEFileKeyProvider extends KeyProvider {
EncryptionConfiguration.JCE_FILE_KEYS);
Preconditions.checkState(!Strings.isNullOrEmpty(passwordProtectedKeys),
"Keys available to KeyStore was not specified or empty");
- for(String passwordName : passwordProtectedKeys.trim().split("\\s+")) {
+ for (String passwordName : passwordProtectedKeys.trim().split("\\s+")) {
String propertyName = Joiner.on(".").join(EncryptionConfiguration.JCE_FILE_KEYS,
passwordName, EncryptionConfiguration.JCE_FILE_KEY_PASSWORD_FILE);
String passwordFileName = context.getString(propertyName,
keyStorePasswordFileName);
File passwordFile = new File(passwordFileName.trim());
- if(passwordFile.isFile()) {
+ if (passwordFile.isFile()) {
aliasPasswordFileMap.put(passwordName, passwordFile);
} else {
logger.warn("Password file for alias " + passwordName +
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java
index 0fef6dc..3263615 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java
@@ -18,7 +18,6 @@
*/
package org.apache.flume.channel.file.encryption;
-
public enum KeyProviderType {
JCEKSFILE(JCEFileKeyProvider.Builder.class),
OTHER(null);
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
index 34f93d9..50492cc 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
@@ -5010,18 +5010,21 @@ public final class ProtosFactory {
public interface RollbackOrBuilder
extends com.google.protobuf.MessageOrBuilder {
}
+
/**
* Protobuf type {@code Rollback}
*/
- public static final class Rollback extends
- com.google.protobuf.GeneratedMessage
- implements RollbackOrBuilder {
+ public static final class Rollback extends com.google.protobuf.GeneratedMessage
+ implements RollbackOrBuilder {
// Use Rollback.newBuilder() to construct.
private Rollback(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
- private Rollback(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private Rollback(boolean noInit) {
+ this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance();
+ }
private static final Rollback defaultInstance;
public static Rollback getDefaultInstance() {
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
index 3b97684..7138b41 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
@@ -89,7 +89,6 @@ public final class ConfigurationConstants {
public static final String OLD_CONFIG_CREATE_SCHEMA =
PREFIX + CONFIG_CREATE_SCHEMA;
-
public static final String CONFIG_CREATE_INDEX = "create.index";
/**
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
index 1192452..fba6e7b 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
@@ -27,6 +27,7 @@ import org.apache.flume.annotations.Disposable;
import org.apache.flume.channel.AbstractChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* <p>
* A JDBC based channel implementation.
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
index e445d61..76bc627 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
@@ -29,7 +29,7 @@ public interface JdbcChannelProvider {
/**
* Initializes the channel provider. This method must be called before
* the channel can be used in any way.
- * @param properties the configuration for the system
+ * @param context the configuration for the system
*/
public void initialize(Context context);
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
index 2dc3fcc..56eebfd 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
@@ -127,7 +127,6 @@ public class DerbySchemaHandler implements SchemaHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(DerbySchemaHandler.class);
-
private static final String QUREY_SYSCHEMA_FLUME
= "SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = 'FLUME'";
@@ -613,15 +612,15 @@ public class DerbySchemaHandler implements SchemaHandler {
// Persist the payload spill
if (hasSpillPayload) {
- spillEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_SPILL);
- spillEventStmt.setLong(1, eventId);
- spillEventStmt.setBinaryStream(2,
- new ByteArrayInputStream(spillPayload), spillPayload.length);
- int spillEventCount = spillEventStmt.executeUpdate();
- if (spillEventCount != 1) {
- throw new JdbcChannelException("Invalid update count on spill "
- + "event insert: " + spillEventCount);
- }
+ spillEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_SPILL);
+ spillEventStmt.setLong(1, eventId);
+ spillEventStmt.setBinaryStream(2,
+ new ByteArrayInputStream(spillPayload), spillPayload.length);
+ int spillEventCount = spillEventStmt.executeUpdate();
+ if (spillEventCount != 1) {
+ throw new JdbcChannelException("Invalid update count on spill "
+ + "event insert: " + spillEventCount);
+ }
}
// Persist the headers
@@ -645,8 +644,7 @@ public class DerbySchemaHandler implements SchemaHandler {
int updateCount = baseHeaderStmt.executeUpdate();
if (updateCount != 1) {
- throw new JdbcChannelException("Unexpected update header count: "
- + updateCount);
+ throw new JdbcChannelException("Unexpected update header count: " + updateCount);
}
ResultSet headerIdResultSet = baseHeaderStmt.getGeneratedKeys();
if (!headerIdResultSet.next()) {
@@ -705,7 +703,7 @@ public class DerbySchemaHandler implements SchemaHandler {
headerValueSpillStmt =
connection.prepareStatement(STMT_INSERT_HEADER_VALUE_SPILL);
- for(HeaderEntry entry : headerWithValueSpill) {
+ for (HeaderEntry entry : headerWithValueSpill) {
String valueSpill = entry.getValue().getSpill();
headerValueSpillStmt.setLong(1, entry.getId());
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
index f42b4dd..845b794 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
@@ -54,12 +54,10 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider {
private static final Logger LOGGER =
LoggerFactory.getLogger(JdbcChannelProviderImpl.class);
-
private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME
- = "org.apache.derby.jdbc.EmbeddedDriver";
+ = "org.apache.derby.jdbc.EmbeddedDriver";
- private static final String DEFAULT_DRIVER_CLASSNAME
- = EMBEDDED_DERBY_DRIVER_CLASSNAME;
+ private static final String DEFAULT_DRIVER_CLASSNAME = EMBEDDED_DERBY_DRIVER_CLASSNAME;
private static final String DEFAULT_USERNAME = "sa";
private static final String DEFAULT_PASSWORD = "";
private static final String DEFAULT_DBTYPE = "DERBY";
@@ -133,7 +131,7 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider {
for (String key: sysProps.keySet()) {
String value = sysProps.get(key);
- if(key != null && value != null) {
+ if (key != null && value != null) {
System.setProperty(key, value);
}
}
@@ -254,7 +252,7 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider {
int index = connectUrl.indexOf(";");
String baseUrl = null;
if (index != -1) {
- baseUrl = connectUrl.substring(0, index+1);
+ baseUrl = connectUrl.substring(0, index + 1);
} else {
baseUrl = connectUrl + ";";
}
@@ -440,12 +438,12 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider {
databaseType = DatabaseType.getByName(dbTypeName);
switch (databaseType) {
- case DERBY:
- case MYSQL:
- break;
- default:
- throw new JdbcChannelException("Database " + databaseType
- + " not supported at this time");
+ case DERBY:
+ case MYSQL:
+ break;
+ default:
+ throw new JdbcChannelException("Database " + databaseType
+ + " not supported at this time");
}
// Register driver
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
index 13b14f5..6f3aecd 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
@@ -28,7 +28,6 @@ import org.apache.flume.channel.jdbc.JdbcChannelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class JdbcTransactionImpl implements Transaction {
private static final Logger LOGGER =
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
index dba96fc..9bfc227 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
@@ -26,14 +26,12 @@ import java.sql.Connection;
public interface SchemaHandler {
/**
- * @param connection the connection to check for schema.
* @return true if the schema exists. False otherwise.
*/
public boolean schemaExists();
/**
* Validates the schema.
- * @param connection
*/
public void validateSchema();
@@ -74,8 +72,6 @@ public interface SchemaHandler {
* must have an active transaction ongoing. This allows the provider impl to
* enforce channel capacity limits when persisting events.
* @return the current size of the channel.
- * @param connection
- * @return
*/
public long getChannelSize(Connection connection);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
index 2543848..35f4c61 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
@@ -27,19 +27,17 @@ import org.apache.flume.channel.jdbc.JdbcChannelException;
*/
public final class SchemaHandlerFactory {
- public static SchemaHandler getHandler(DatabaseType dbType,
- DataSource dataSource) {
+ public static SchemaHandler getHandler(DatabaseType dbType, DataSource dataSource) {
SchemaHandler handler = null;
- switch(dbType) {
- case DERBY:
- handler = new DerbySchemaHandler(dataSource);
- break;
- case MYSQL:
- handler = new MySQLSchemaHandler(dataSource);
- break;
- default:
- throw new JdbcChannelException("Database " + dbType
- + " not supported yet");
+ switch (dbType) {
+ case DERBY:
+ handler = new DerbySchemaHandler(dataSource);
+ break;
+ case MYSQL:
+ handler = new MySQLSchemaHandler(dataSource);
+ break;
+ default:
+ throw new JdbcChannelException("Database " + dbType + " not supported yet");
}
return handler;
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index dfc95bc..90e3288 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -73,7 +73,7 @@ import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
public class KafkaChannel extends BasicChannelSemantics {
- private final static Logger logger =
+ private static final Logger logger =
LoggerFactory.getLogger(KafkaChannel.class);
private final Properties consumerProps = new Properties();
@@ -97,27 +97,27 @@ public class KafkaChannel extends BasicChannelSemantics {
private KafkaChannelCounter counter;
- /* Each Consumer commit will commit all partitions owned by it. To
- * ensure that each partition is only committed when all events are
- * actually done, we will need to keep a Consumer per thread.
- */
+ /* Each Consumer commit will commit all partitions owned by it. To
+ * ensure that each partition is only committed when all events are
+ * actually done, we will need to keep a Consumer per thread.
+ */
- private final ThreadLocal<ConsumerAndRecords> consumerAndRecords = new
- ThreadLocal<ConsumerAndRecords>() {
- @Override
- public ConsumerAndRecords initialValue() {
- return createConsumerAndRecords();
- }
- };
+ private final ThreadLocal<ConsumerAndRecords> consumerAndRecords =
+ new ThreadLocal<ConsumerAndRecords>() {
+ @Override
+ public ConsumerAndRecords initialValue() {
+ return createConsumerAndRecords();
+ }
+ };
@Override
public void start() {
- logger.info("Starting Kafka Channel: {}", getName());
- producer = new KafkaProducer<String, byte[]>(producerProps);
- // We always have just one topic being read by one thread
- logger.info("Topic = {}", topic.get());
- counter.start();
- super.start();
+ logger.info("Starting Kafka Channel: {}", getName());
+ producer = new KafkaProducer<String, byte[]>(producerProps);
+ // We always have just one topic being read by one thread
+ logger.info("Topic = {}", topic.get());
+ counter.start();
+ super.start();
}
@Override
@@ -185,17 +185,19 @@ public class KafkaChannel extends BasicChannelSemantics {
throw new ConfigurationException("Bootstrap Servers must be specified");
} else {
ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
- logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
+ logger.warn("{} is deprecated. Please use the parameter {}",
+ BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
}
}
//GroupId
// If there is an old Group Id set, then use that if no groupId is set.
if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) {
- String oldGroupId = ctx.getString(GROUP_ID_FLUME);
- if ( oldGroupId != null && !oldGroupId.isEmpty()) {
+ String oldGroupId = ctx.getString(GROUP_ID_FLUME);
+ if (oldGroupId != null && !oldGroupId.isEmpty()) {
ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, oldGroupId);
- logger.warn("{} is deprecated. Please use the parameter {}", GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+ logger.warn("{} is deprecated. Please use the parameter {}",
+ GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
}
}
@@ -209,7 +211,9 @@ public class KafkaChannel extends BasicChannelSemantics {
auto = "latest";
}
ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,auto);
- logger.warn("{} is deprecated. Please use the parameter {}", READ_SMALLEST_OFFSET,KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ logger.warn("{} is deprecated. Please use the parameter {}",
+ READ_SMALLEST_OFFSET,
+ KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
}
}
@@ -249,15 +253,17 @@ public class KafkaChannel extends BasicChannelSemantics {
logger.info(consumerProps.toString());
}
- protected Properties getConsumerProps() { return consumerProps; }
-
+ protected Properties getConsumerProps() {
+ return consumerProps;
+ }
private synchronized ConsumerAndRecords createConsumerAndRecords() {
try {
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProps);
ConsumerAndRecords car = new ConsumerAndRecords(consumer, channelUUID);
logger.info("Created new consumer to connect to Kafka");
- car.consumer.subscribe(Arrays.asList(topic.get()), new ChannelRebalanceListener(rebalanceFlag));
+ car.consumer.subscribe(Arrays.asList(topic.get()),
+ new ChannelRebalanceListener(rebalanceFlag));
car.offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
consumers.add(car);
return car;
@@ -286,14 +292,14 @@ public class KafkaChannel extends BasicChannelSemantics {
NONE
}
-
private class KafkaTransaction extends BasicTransactionSemantics {
private TransactionType type = TransactionType.NONE;
private Optional<ByteArrayOutputStream> tempOutStream = Optional
.absent();
// For put transactions, serialize the events and hold them until the commit goes is requested.
- private Optional<LinkedList<ProducerRecord<String, byte[]>>> producerRecords = Optional.absent();
+ private Optional<LinkedList<ProducerRecord<String, byte[]>>> producerRecords =
+ Optional.absent();
// For take transactions, deserialize and hold them till commit goes through
private Optional<LinkedList<Event>> events = Optional.absent();
private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
@@ -323,8 +329,9 @@ public class KafkaChannel extends BasicChannelSemantics {
}
String key = event.getHeaders().get(KEY_HEADER);
try {
- producerRecords.get().add(new ProducerRecord<String, byte[]>
- (topic.get(), key, serializeValue(event, parseAsFlumeEvent)));
+ producerRecords.get().add(
+ new ProducerRecord<String, byte[]>(topic.get(), key,
+ serializeValue(event, parseAsFlumeEvent)));
} catch (Exception e) {
throw new ChannelException("Error while serializing event", e);
}
@@ -382,7 +389,8 @@ public class KafkaChannel extends BasicChannelSemantics {
}
if (logger.isDebugEnabled()) {
- logger.debug("Processed output from partition {} offset {}", record.partition(), record.offset());
+ logger.debug("Processed output from partition {} offset {}",
+ record.partition(), record.offset());
}
long endTime = System.nanoTime();
@@ -391,10 +399,10 @@ public class KafkaChannel extends BasicChannelSemantics {
return null;
}
} catch (Exception ex) {
- logger.warn("Error while getting events from Kafka. This is usually caused by trying to read " +
- "a non-flume event. Ensure the setting for parseAsFlumeEvent is correct", ex);
- throw new ChannelException("Error while getting events from Kafka",
- ex);
+ logger.warn("Error while getting events from Kafka. This is usually caused by " +
+ "trying to read a non-flume event. Ensure the setting for " +
+ "parseAsFlumeEvent is correct", ex);
+ throw new ChannelException("Error while getting events from Kafka", ex);
}
}
eventTaken = true;
@@ -564,8 +572,9 @@ public class KafkaChannel extends BasicChannelSemantics {
StringBuilder sb = new StringBuilder();
for (TopicPartition tp : this.consumer.assignment()) {
try {
- sb.append("Committed: [").append(tp).append(",").append(this.consumer.committed(tp).offset())
- .append(",").append(this.consumer.committed(tp).metadata()).append("]");
+ sb.append("Committed: [").append(tp).append(",")
+ .append(this.consumer.committed(tp).offset())
+ .append(",").append(this.consumer.committed(tp).metadata()).append("]");
if (logger.isDebugEnabled()) {
logger.debug(sb.toString());
}
@@ -596,8 +605,8 @@ class ChannelCallback implements Callback {
}
if (log.isDebugEnabled()) {
long batchElapsedTime = System.currentTimeMillis() - startTime;
- log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + metadata.partition() + "-" +
- metadata.offset() + "-" + batchElapsedTime);
+ log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" +
+ metadata.partition() + "-" + metadata.offset() + "-" + batchElapsedTime);
}
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
index faf46b6..ccf46d9 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -26,12 +26,17 @@ public class KafkaChannelConfiguration {
public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
public static final String KAFKA_PRODUCER_PREFIX = KAFKA_PREFIX + "producer.";
public static final String DEFAULT_ACKS = "all";
- public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
- public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
- public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
- public static final String DEFAULT_VALUE_DESERIAIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
+ public static final String DEFAULT_KEY_SERIALIZER =
+ "org.apache.kafka.common.serialization.StringSerializer";
+ public static final String DEFAULT_VALUE_SERIAIZER =
+ "org.apache.kafka.common.serialization.ByteArraySerializer";
+ public static final String DEFAULT_KEY_DESERIALIZER =
+ "org.apache.kafka.common.serialization.StringDeserializer";
+ public static final String DEFAULT_VALUE_DESERIAIZER =
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer";
public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic";
- public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+ public static final String BOOTSTRAP_SERVERS_CONFIG =
+ KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
public static final String DEFAULT_TOPIC = "flume-channel";
public static final String DEFAULT_GROUP_ID = "flume";
public static final String POLL_TIMEOUT = KAFKA_PREFIX + "pollTimeout";
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java b/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java
index bdf42cd..b46d646 100644
--- a/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java
+++ b/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java
@@ -18,26 +18,26 @@
*/
package org.apache.flume.channel;
-import java.util.ArrayDeque;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.concurrent.GuardedBy;
-
import com.google.common.annotations.VisibleForTesting;
-import org.apache.flume.*;
-import org.apache.flume.annotations.Recyclable;
-
+import com.google.common.base.Preconditions;
+import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelFullException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.annotations.Recyclable;
import org.apache.flume.channel.file.FileChannel;
import org.apache.flume.instrumentation.ChannelCounter;
-
import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.ArrayDeque;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
/**
* <p>
@@ -50,28 +50,45 @@ import com.google.common.base.Preconditions;
@Recyclable
public class SpillableMemoryChannel extends FileChannel {
// config settings
- /** Max number of events to be stored in memory */
+ /**
+ * Max number of events to be stored in memory
+ */
public static final String MEMORY_CAPACITY = "memoryCapacity";
- /** Seconds to wait before enabling disk overflow when memory fills up */
+ /**
+ * Seconds to wait before enabling disk overflow when memory fills up
+ */
public static final String OVERFLOW_TIMEOUT = "overflowTimeout";
- /** Internal use only. To remain undocumented in User guide. Determines the
+ /**
+ * Internal use only. To remain undocumented in User guide. Determines the
* percent free space available in mem queue when we stop spilling to overflow
*/
public static final String OVERFLOW_DEACTIVATION_THRESHOLD
- = "overflowDeactivationThreshold";
- /** percent of buffer between byteCapacity and the estimated event size. */
+ = "overflowDeactivationThreshold";
+ /**
+ * percent of buffer between byteCapacity and the estimated event size.
+ */
public static final String BYTE_CAPACITY_BUFFER_PERCENTAGE
- = "byteCapacityBufferPercentage";
+ = "byteCapacityBufferPercentage";
- /** max number of bytes used for all events in the queue. */
+ /**
+ * max number of bytes used for all events in the queue.
+ */
public static final String BYTE_CAPACITY = "byteCapacity";
- /** max number of events in overflow. */
+ /**
+ * max number of events in overflow.
+ */
public static final String OVERFLOW_CAPACITY = "overflowCapacity";
- /** file channel setting that is overriden by Spillable Channel */
+ /**
+ * file channel setting that is overriden by Spillable Channel
+ */
public static final String KEEP_ALIVE = "keep-alive";
- /** file channel capacity overridden by Spillable Channel */
+ /**
+ * file channel capacity overridden by Spillable Channel
+ */
public static final String CAPACITY = "capacity";
- /** Estimated average size of events expected to be in the channel */
+ /**
+ * Estimated average size of events expected to be in the channel
+ */
public static final String AVG_EVENT_SIZE = "avgEventSize";
private static Logger LOGGER = LoggerFactory.getLogger(SpillableMemoryChannel.class);
@@ -84,7 +101,7 @@ public class SpillableMemoryChannel extends FileChannel {
// memory consumption control
private static final int defaultAvgEventSize = 500;
private static final Long defaultByteCapacity
- = (long)(Runtime.getRuntime().maxMemory() * .80);
+ = (long) (Runtime.getRuntime().maxMemory() * .80);
private static final int defaultByteCapacityBufferPercentage = 20;
private volatile int byteCapacity;
@@ -94,7 +111,7 @@ public class SpillableMemoryChannel extends FileChannel {
private Semaphore bytesRemaining;
// for synchronizing access to primary/overflow channels & drain order
- final private Object queueLock = new Object();
+ private final Object queueLock = new Object();
@GuardedBy(value = "queueLock")
public ArrayDeque<Event> memQueue;
@@ -109,8 +126,10 @@ public class SpillableMemoryChannel extends FileChannel {
private int maxMemQueueSize = 0; // max sie of memory Queue
- private boolean overflowDisabled; // if true indicates the overflow should not be used at all.
- private boolean overflowActivated=false; // indicates if overflow can be used. invariant: false if overflowDisabled is true.
+ private boolean overflowDisabled; // if true indicates the overflow should not be used at all.
+
+ // indicates if overflow can be used. invariant: false if overflowDisabled is true.
+ private boolean overflowActivated = false;
// if true overflow can be used. invariant: false if overflowDisabled is true.
private int memoryCapacity = -1; // max events that the channel can hold in memory
@@ -120,7 +139,7 @@ public class SpillableMemoryChannel extends FileChannel {
// mem full % at which we stop spill to overflow
private double overflowDeactivationThreshold
- = defaultOverflowDeactivationThreshold / 100;
+ = defaultOverflowDeactivationThreshold / 100;
public SpillableMemoryChannel() {
super();
@@ -133,6 +152,7 @@ public class SpillableMemoryChannel extends FileChannel {
public int getMemoryCapacity() {
return memoryCapacity;
}
+
public int getOverflowTimeout() {
return overflowTimeout;
}
@@ -160,7 +180,6 @@ public class SpillableMemoryChannel extends FileChannel {
}
}
-
private static class MutableInteger {
private int value;
@@ -186,7 +205,7 @@ public class SpillableMemoryChannel extends FileChannel {
public int totalPuts = 0; // for debugging only
private long overflowCounter = 0; // # of items in overflow channel
- public String dump() {
+ public String dump() {
StringBuilder sb = new StringBuilder();
sb.append(" [ ");
@@ -195,12 +214,12 @@ public class SpillableMemoryChannel extends FileChannel {
sb.append(" ");
}
sb.append("]");
- return sb.toString();
+ return sb.toString();
}
public void putPrimary(Integer eventCount) {
totalPuts += eventCount;
- if ( (queue.peekLast() == null) || queue.getLast().intValue() < 0) {
+ if ((queue.peekLast() == null) || queue.getLast().intValue() < 0) {
queue.addLast(new MutableInteger(eventCount));
} else {
queue.getLast().add(eventCount);
@@ -208,7 +227,7 @@ public class SpillableMemoryChannel extends FileChannel {
}
public void putFirstPrimary(Integer eventCount) {
- if ( (queue.peekFirst() == null) || queue.getFirst().intValue() < 0) {
+ if ((queue.peekFirst() == null) || queue.getFirst().intValue() < 0) {
queue.addFirst(new MutableInteger(eventCount));
} else {
queue.getFirst().add(eventCount);
@@ -217,7 +236,7 @@ public class SpillableMemoryChannel extends FileChannel {
public void putOverflow(Integer eventCount) {
totalPuts += eventCount;
- if ( (queue.peekLast() == null) || queue.getLast().intValue() > 0) {
+ if ((queue.peekLast() == null) || queue.getLast().intValue() > 0) {
queue.addLast(new MutableInteger(-eventCount));
} else {
queue.getLast().add(-eventCount);
@@ -226,9 +245,9 @@ public class SpillableMemoryChannel extends FileChannel {
}
public void putFirstOverflow(Integer eventCount) {
- if ( (queue.peekFirst() == null) || queue.getFirst().intValue() > 0) {
+ if ((queue.peekFirst() == null) || queue.getFirst().intValue() > 0) {
queue.addFirst(new MutableInteger(-eventCount));
- } else {
+ } else {
queue.getFirst().add(-eventCount);
}
overflowCounter += eventCount;
@@ -247,9 +266,9 @@ public class SpillableMemoryChannel extends FileChannel {
// this condition is optimization to avoid redundant conversions of
// int -> Integer -> string in hot path
- if (headValue.intValue() < takeCount) {
+ if (headValue.intValue() < takeCount) {
throw new IllegalStateException("Cannot take " + takeCount +
- " from " + headValue.intValue() + " in DrainOrder Queue");
+ " from " + headValue.intValue() + " in DrainOrder Queue");
}
headValue.add(-takeCount);
@@ -260,9 +279,9 @@ public class SpillableMemoryChannel extends FileChannel {
public void takeOverflow(int takeCount) {
MutableInteger headValue = queue.getFirst();
- if(headValue.intValue() > -takeCount) {
+ if (headValue.intValue() > -takeCount) {
throw new IllegalStateException("Cannot take " + takeCount + " from "
- + headValue.intValue() + " in DrainOrder Queue head " );
+ + headValue.intValue() + " in DrainOrder Queue head ");
}
headValue.add(takeCount);
@@ -293,7 +312,6 @@ public class SpillableMemoryChannel extends FileChannel {
ArrayDeque<Event> putList;
private final ChannelCounter channelCounter;
-
public SpillableMemoryTransaction(ChannelCounter counter) {
takeList = new ArrayDeque<Event>(largestTakeTxSize);
putList = new ArrayDeque<Event>(largestPutTxSize);
@@ -307,26 +325,25 @@ public class SpillableMemoryChannel extends FileChannel {
@Override
public void close() {
- if (overflowTakeTx!=null) {
+ if (overflowTakeTx != null) {
overflowTakeTx.close();
}
- if (overflowPutTx!=null) {
+ if (overflowPutTx != null) {
overflowPutTx.close();
}
super.close();
}
-
@Override
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
putCalled = true;
- int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize);
+ int eventByteSize = (int) Math.ceil(estimateEventSize(event) / avgEventSize);
if (!putList.offer(event)) {
throw new ChannelFullException("Put queue in " + getName() +
- " channel's Transaction having capacity " + putList.size() +
- " full, consider reducing batch size of sources");
+ " channel's Transaction having capacity " + putList.size() +
+ " full, consider reducing batch size of sources");
}
putListByteCount += eventByteSize;
}
@@ -344,7 +361,7 @@ public class SpillableMemoryChannel extends FileChannel {
boolean takeSuceeded = false;
try {
Event event;
- synchronized(queueLock) {
+ synchronized (queueLock) {
int drainOrderTop = drainOrder.front();
if (!takeCalled) {
@@ -375,11 +392,11 @@ public class SpillableMemoryChannel extends FileChannel {
++takeCount;
drainOrder.takePrimary(1);
Preconditions.checkNotNull(event, "Queue.poll returned NULL despite"
- + " semaphore signalling existence of entry");
+ + " semaphore signalling existence of entry");
}
}
- int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize);
+ int eventByteSize = (int) Math.ceil(estimateEventSize(event) / avgEventSize);
if (!useOverflow) {
// takeList is thd pvt, so no need to do this in synchronized block
takeList.offer(event);
@@ -389,7 +406,7 @@ public class SpillableMemoryChannel extends FileChannel {
takeSuceeded = true;
return event;
} finally {
- if(!takeSuceeded) {
+ if (!takeSuceeded) {
totalStored.release();
}
}
@@ -400,37 +417,35 @@ public class SpillableMemoryChannel extends FileChannel {
if (putCalled) {
putCommit();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Put Committed. Drain Order Queue state : "
- + drainOrder.dump());
+ LOGGER.debug("Put Committed. Drain Order Queue state : " + drainOrder.dump());
}
} else if (takeCalled) {
takeCommit();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Take Committed. Drain Order Queue state : "
- + drainOrder.dump());
+ LOGGER.debug("Take Committed. Drain Order Queue state : " + drainOrder.dump());
}
}
}
private void takeCommit() {
- if (takeCount > largestTakeTxSize)
+ if (takeCount > largestTakeTxSize) {
largestTakeTxSize = takeCount;
+ }
synchronized (queueLock) {
- if (overflowTakeTx!=null) {
+ if (overflowTakeTx != null) {
overflowTakeTx.commit();
}
- double memoryPercentFree = (memoryCapacity == 0) ? 0
- : (memoryCapacity - memQueue.size() + takeCount ) / (double)memoryCapacity ;
+ double memoryPercentFree = (memoryCapacity == 0) ? 0 :
+ (memoryCapacity - memQueue.size() + takeCount) / (double) memoryCapacity;
- if (overflowActivated
- && memoryPercentFree >= overflowDeactivationThreshold) {
+ if (overflowActivated && memoryPercentFree >= overflowDeactivationThreshold) {
overflowActivated = false;
LOGGER.info("Overflow Deactivated");
}
channelCounter.setChannelSize(getTotalStored());
}
- if (!useOverflow) {
+ if (!useOverflow) {
memQueRemaining.release(takeCount);
bytesRemaining.release(takeListByteCount);
}
@@ -440,30 +455,29 @@ public class SpillableMemoryChannel extends FileChannel {
private void putCommit() throws InterruptedException {
// decide if overflow needs to be used
- int timeout = overflowActivated ? 0 : overflowTimeout;
+ int timeout = overflowActivated ? 0 : overflowTimeout;
if (memoryCapacity != 0) {
// check for enough event slots(memoryCapacity) for using memory queue
if (!memQueRemaining.tryAcquire(putList.size(), timeout,
- TimeUnit.SECONDS)) {
+ TimeUnit.SECONDS)) {
if (overflowDisabled) {
throw new ChannelFullException("Spillable Memory Channel's " +
- "memory capacity has been reached and overflow is " +
- "disabled. Consider increasing memoryCapacity.");
+ "memory capacity has been reached and overflow is " +
+ "disabled. Consider increasing memoryCapacity.");
}
overflowActivated = true;
useOverflow = true;
- }
// check if we have enough byteCapacity for using memory queue
- else if (!bytesRemaining.tryAcquire(putListByteCount, overflowTimeout
- , TimeUnit.SECONDS)) {
+ } else if (!bytesRemaining.tryAcquire(putListByteCount,
+ overflowTimeout, TimeUnit.SECONDS)) {
memQueRemaining.release(putList.size());
if (overflowDisabled) {
throw new ChannelFullException("Spillable Memory Channel's "
- + "memory capacity has been reached. "
- + (bytesRemaining.availablePermits() * (int) avgEventSize)
- + " bytes are free and overflow is disabled. Consider "
- + "increasing byteCapacity or capacity.");
+ + "memory capacity has been reached. "
+ + (bytesRemaining.availablePermits() * (int) avgEventSize)
+ + " bytes are free and overflow is disabled. Consider "
+ + "increasing byteCapacity or capacity.");
}
overflowActivated = true;
useOverflow = true;
@@ -496,22 +510,21 @@ public class SpillableMemoryChannel extends FileChannel {
}
private void commitPutsToOverflow_core(Transaction overflowPutTx)
- throws InterruptedException {
+ throws InterruptedException {
// reattempt only once if overflow is full first time around
- for (int i = 0; i < 2; ++i) {
+ for (int i = 0; i < 2; ++i) {
try {
- synchronized(queueLock) {
+ synchronized (queueLock) {
overflowPutTx.commit();
drainOrder.putOverflow(putList.size());
channelCounter.setChannelSize(memQueue.size()
- + drainOrder.overflowCounter);
+ + drainOrder.overflowCounter);
break;
}
- } catch (ChannelFullException e) { // drop lock & reattempt
- if (i==0) {
- Thread.sleep(overflowTimeout *1000);
- }
- else {
+ } catch (ChannelFullException e) { // drop lock & reattempt
+ if (i == 0) {
+ Thread.sleep(overflowTimeout * 1000);
+ } else {
throw e;
}
}
@@ -523,14 +536,14 @@ public class SpillableMemoryChannel extends FileChannel {
for (Event e : putList) {
if (!memQueue.offer(e)) {
throw new ChannelException("Unable to insert event into memory " +
- "queue in spite of spare capacity, this is very unexpected");
+ "queue in spite of spare capacity, this is very unexpected");
}
}
drainOrder.putPrimary(putList.size());
- maxMemQueueSize = (memQueue.size() > maxMemQueueSize) ? memQueue.size()
- : maxMemQueueSize;
+ maxMemQueueSize = (memQueue.size() > maxMemQueueSize) ? memQueue.size()
+ : maxMemQueueSize;
channelCounter.setChannelSize(memQueue.size()
- + drainOrder.overflowCounter);
+ + drainOrder.overflowCounter);
}
// update counters and semaphores
totalStored.release(putList.size());
@@ -540,10 +553,10 @@ public class SpillableMemoryChannel extends FileChannel {
@Override
protected void doRollback() {
LOGGER.debug("Rollback() of " +
- (takeCalled ? " Take Tx" : (putCalled ? " Put Tx" : "Empty Tx")));
+ (takeCalled ? " Take Tx" : (putCalled ? " Put Tx" : "Empty Tx")));
if (putCalled) {
- if (overflowPutTx!=null) {
+ if (overflowPutTx != null) {
overflowPutTx.rollback();
}
if (!useOverflow) {
@@ -552,8 +565,8 @@ public class SpillableMemoryChannel extends FileChannel {
}
putListByteCount = 0;
} else if (takeCalled) {
- synchronized(queueLock) {
- if (overflowTakeTx!=null) {
+ synchronized (queueLock) {
+ if (overflowTakeTx != null) {
overflowTakeTx.rollback();
}
if (useOverflow) {
@@ -561,8 +574,8 @@ public class SpillableMemoryChannel extends FileChannel {
} else {
int remainingCapacity = memoryCapacity - memQueue.size();
Preconditions.checkState(remainingCapacity >= takeCount,
- "Not enough space in memory queue to rollback takes. This" +
- " should never happen, please report");
+ "Not enough space in memory queue to rollback takes. This" +
+ " should never happen, please report");
while (!takeList.isEmpty()) {
memQueue.addFirst(takeList.removeLast());
}
@@ -582,15 +595,18 @@ public class SpillableMemoryChannel extends FileChannel {
* <li>memoryCapacity = total number of events allowed at one time in the memory queue.
* <li>overflowCapacity = total number of events allowed at one time in the overflow file channel.
* <li>byteCapacity = the max number of bytes used for events in the memory queue.
- * <li>byteCapacityBufferPercentage = type int. Defines the percent of buffer between byteCapacity and the estimated event size.
- * <li>overflowTimeout = type int. Number of seconds to wait on a full memory before deciding to enable overflow
+ * <li>byteCapacityBufferPercentage = type int. Defines the percent of buffer between byteCapacity
+ * and the estimated event size.
+ * <li>overflowTimeout = type int. Number of seconds to wait on a full memory before deciding to
+ * enable overflow
*/
@Override
public void configure(Context context) {
- if (getLifecycleState() == LifecycleState.START // does not support reconfig when running
- || getLifecycleState() == LifecycleState.ERROR)
+ if (getLifecycleState() == LifecycleState.START || // does not support reconfig when running
+ getLifecycleState() == LifecycleState.ERROR) {
stop();
+ }
if (totalStored == null) {
totalStored = new Semaphore(0);
@@ -603,8 +619,7 @@ public class SpillableMemoryChannel extends FileChannel {
// 1) Memory Capacity
Integer newMemoryCapacity;
try {
- newMemoryCapacity = context.getInteger(MEMORY_CAPACITY
- , defaultMemoryCapacity);
+ newMemoryCapacity = context.getInteger(MEMORY_CAPACITY, defaultMemoryCapacity);
if (newMemoryCapacity == null) {
newMemoryCapacity = defaultMemoryCapacity;
}
@@ -612,7 +627,7 @@ public class SpillableMemoryChannel extends FileChannel {
throw new NumberFormatException(MEMORY_CAPACITY + " must be >= 0");
}
- } catch(NumberFormatException e) {
+ } catch (NumberFormatException e) {
newMemoryCapacity = defaultMemoryCapacity;
LOGGER.warn("Invalid " + MEMORY_CAPACITY + " specified, initializing " +
getName() + " channel to default value of {}", defaultMemoryCapacity);
@@ -626,60 +641,60 @@ public class SpillableMemoryChannel extends FileChannel {
// overflowTimeout - wait time before switching to overflow when mem is full
try {
Integer newOverflowTimeout =
- context.getInteger(OVERFLOW_TIMEOUT, defaultOverflowTimeout);
+ context.getInteger(OVERFLOW_TIMEOUT, defaultOverflowTimeout);
overflowTimeout = (newOverflowTimeout != null) ? newOverflowTimeout
- : defaultOverflowTimeout;
- } catch(NumberFormatException e) {
+ : defaultOverflowTimeout;
+ } catch (NumberFormatException e) {
LOGGER.warn("Incorrect value for " + getName() + "'s " + OVERFLOW_TIMEOUT
- + " setting. Using default value {}", defaultOverflowTimeout);
+ + " setting. Using default value {}", defaultOverflowTimeout);
overflowTimeout = defaultOverflowTimeout;
}
try {
Integer newThreshold = context.getInteger(OVERFLOW_DEACTIVATION_THRESHOLD);
- overflowDeactivationThreshold = (newThreshold != null) ?
- newThreshold/100.0
- : defaultOverflowDeactivationThreshold / 100.0;
- } catch(NumberFormatException e) {
+ overflowDeactivationThreshold = (newThreshold != null) ?
+ newThreshold / 100.0
+ : defaultOverflowDeactivationThreshold / 100.0;
+ } catch (NumberFormatException e) {
LOGGER.warn("Incorrect value for " + getName() + "'s " +
OVERFLOW_DEACTIVATION_THRESHOLD + ". Using default value {} %",
- defaultOverflowDeactivationThreshold);
+ defaultOverflowDeactivationThreshold);
overflowDeactivationThreshold = defaultOverflowDeactivationThreshold / 100.0;
}
// 3) Memory consumption control
try {
byteCapacityBufferPercentage =
- context.getInteger(BYTE_CAPACITY_BUFFER_PERCENTAGE
- , defaultByteCapacityBufferPercentage);
- } catch(NumberFormatException e) {
+ context.getInteger(BYTE_CAPACITY_BUFFER_PERCENTAGE, defaultByteCapacityBufferPercentage);
+ } catch (NumberFormatException e) {
LOGGER.warn("Error parsing " + BYTE_CAPACITY_BUFFER_PERCENTAGE + " for "
- + getName() + ". Using default="
- + defaultByteCapacityBufferPercentage + ". " + e.getMessage());
+ + getName() + ". Using default="
+ + defaultByteCapacityBufferPercentage + ". " + e.getMessage());
byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage;
}
try {
avgEventSize = context.getInteger(AVG_EVENT_SIZE, defaultAvgEventSize);
- } catch ( NumberFormatException e) {
+ } catch (NumberFormatException e) {
LOGGER.warn("Error parsing " + AVG_EVENT_SIZE + " for " + getName()
- + ". Using default = " + defaultAvgEventSize + ". "
- + e.getMessage());
+ + ". Using default = " + defaultAvgEventSize + ". "
+ + e.getMessage());
avgEventSize = defaultAvgEventSize;
}
try {
- byteCapacity = (int) ((context.getLong(BYTE_CAPACITY, defaultByteCapacity) * (1 - byteCapacityBufferPercentage * .01 )) / avgEventSize);
+ byteCapacity = (int) ((context.getLong(BYTE_CAPACITY, defaultByteCapacity) *
+ (1 - byteCapacityBufferPercentage * .01)) / avgEventSize);
if (byteCapacity < 1) {
byteCapacity = Integer.MAX_VALUE;
}
- } catch(NumberFormatException e) {
+ } catch (NumberFormatException e) {
LOGGER.warn("Error parsing " + BYTE_CAPACITY + " setting for " + getName()
- + ". Using default = " + defaultByteCapacity + ". "
- + e.getMessage());
+ + ". Using default = " + defaultByteCapacity + ". "
+ + e.getMessage());
byteCapacity = (int)
- ( (defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 ))
- / avgEventSize);
+ ((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01))
+ / avgEventSize);
}
@@ -692,8 +707,10 @@ public class SpillableMemoryChannel extends FileChannel {
lastByteCapacity = byteCapacity;
} else {
try {
- if (!bytesRemaining.tryAcquire(lastByteCapacity - byteCapacity, overflowTimeout, TimeUnit.SECONDS)) {
- LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, resizing has been aborted");
+ if (!bytesRemaining.tryAcquire(lastByteCapacity - byteCapacity,
+ overflowTimeout, TimeUnit.SECONDS)) {
+ LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, " +
+ "resizing has been aborted");
} else {
lastByteCapacity = byteCapacity;
}
@@ -704,51 +721,53 @@ public class SpillableMemoryChannel extends FileChannel {
}
try {
- overflowCapacity = context.getInteger(OVERFLOW_CAPACITY, defaultOverflowCapacity); // file channel capacity
+ // file channel capacity
+ overflowCapacity = context.getInteger(OVERFLOW_CAPACITY, defaultOverflowCapacity);
// Determine if File channel needs to be disabled
- if ( memoryCapacity < 1 && overflowCapacity < 1) {
- LOGGER.warn("For channel " + getName() + OVERFLOW_CAPACITY +
- " cannot be set to 0 if " + MEMORY_CAPACITY + " is also 0. " +
- "Using default value " + OVERFLOW_CAPACITY + " = " +
- defaultOverflowCapacity);
- overflowCapacity = defaultOverflowCapacity;
- }
- overflowDisabled = (overflowCapacity < 1) ;
- if (overflowDisabled) {
- overflowActivated = false;
- }
- } catch(NumberFormatException e) {
+ if (memoryCapacity < 1 && overflowCapacity < 1) {
+ LOGGER.warn("For channel " + getName() + OVERFLOW_CAPACITY +
+ " cannot be set to 0 if " + MEMORY_CAPACITY + " is also 0. " +
+ "Using default value " + OVERFLOW_CAPACITY + " = " +
+ defaultOverflowCapacity);
+ overflowCapacity = defaultOverflowCapacity;
+ }
+ overflowDisabled = (overflowCapacity < 1);
+ if (overflowDisabled) {
+ overflowActivated = false;
+ }
+ } catch (NumberFormatException e) {
overflowCapacity = defaultOverflowCapacity;
}
// Configure File channel
- context.put(KEEP_ALIVE,"0"); // override keep-alive for File channel
- context.put(CAPACITY, Integer.toString(overflowCapacity) ); // file channel capacity
+ context.put(KEEP_ALIVE, "0"); // override keep-alive for File channel
+ context.put(CAPACITY, Integer.toString(overflowCapacity)); // file channel capacity
super.configure(context);
}
private void resizePrimaryQueue(int newMemoryCapacity) throws InterruptedException {
- if (memQueue != null && memoryCapacity == newMemoryCapacity) {
+ if (memQueue != null && memoryCapacity == newMemoryCapacity) {
return;
}
if (memoryCapacity > newMemoryCapacity) {
int diff = memoryCapacity - newMemoryCapacity;
if (!memQueRemaining.tryAcquire(diff, overflowTimeout, TimeUnit.SECONDS)) {
- LOGGER.warn("Memory buffer currently contains more events than the new size. Downsizing has been aborted.");
+ LOGGER.warn("Memory buffer currently contains more events than the new size. " +
+ "Downsizing has been aborted.");
return;
}
- synchronized(queueLock) {
+ synchronized (queueLock) {
ArrayDeque<Event> newQueue = new ArrayDeque<Event>(newMemoryCapacity);
newQueue.addAll(memQueue);
memQueue = newQueue;
memoryCapacity = newMemoryCapacity;
}
- } else { // if (memoryCapacity <= newMemoryCapacity)
- synchronized(queueLock) {
+ } else { // if (memoryCapacity <= newMemoryCapacity)
+ synchronized (queueLock) {
ArrayDeque<Event> newQueue = new ArrayDeque<Event>(newMemoryCapacity);
- if (memQueue !=null) {
+ if (memQueue != null) {
newQueue.addAll(memQueue);
}
memQueue = newQueue;
@@ -771,14 +790,14 @@ public class SpillableMemoryChannel extends FileChannel {
drainOrder.putOverflow(overFlowCount);
totalStored.release(overFlowCount);
}
- int totalCount = overFlowCount + memQueue.size();
+ int totalCount = overFlowCount + memQueue.size();
channelCounter.setChannelCapacity(memoryCapacity + getOverflowCapacity());
channelCounter.setChannelSize(totalCount);
}
@Override
public synchronized void stop() {
- if (getLifecycleState()==LifecycleState.STOP) {
+ if (getLifecycleState() == LifecycleState.STOP) {
return;
}
channelCounter.setChannelSize(memQueue.size() + drainOrder.overflowCounter);
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
index 713234f..ae31916 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
@@ -99,11 +99,11 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
@Override
public synchronized void append(LoggingEvent event) {
- if(!configured) {
+ if (!configured) {
String errorMsg = "Flume Log4jAppender not configured correctly! Cannot" +
- " send events to Flume.";
+ " send events to Flume.";
LogLog.error(errorMsg);
- if(getUnsafeMode()) {
+ if (getUnsafeMode()) {
return;
}
throw new FlumeException(errorMsg);
@@ -121,10 +121,9 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
@Override
public void activateOptions() throws FlumeException {
try {
- final Properties properties = getProperties(hosts, selector,
- maxBackoff, getTimeout());
+ final Properties properties = getProperties(hosts, selector, maxBackoff, getTimeout());
rpcClient = RpcClientFactory.getInstance(properties);
- if(layout != null) {
+ if (layout != null) {
layout.activateOptions();
}
configured = true;
@@ -169,14 +168,13 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
throw new FlumeException(
"Misconfigured max backoff, value must be greater than 0");
}
- props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF,
- String.valueOf(true));
+ props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF, String.valueOf(true));
props.put(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, maxBackoff);
}
props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
- String.valueOf(timeout));
+ String.valueOf(timeout));
props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
- String.valueOf(timeout));
+ String.valueOf(timeout));
return props;
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
index 7c483db..f9803e4 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
@@ -75,8 +75,7 @@ public class Log4jAppender extends AppenderSkeleton {
private String hostname;
private int port;
private boolean unsafeMode = false;
- private long timeout = RpcClientConfigurationConstants
- .DEFAULT_REQUEST_TIMEOUT_MILLIS;
+ private long timeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
private boolean avroReflectionEnabled;
private String avroSchemaUrl;
@@ -99,7 +98,7 @@ public class Log4jAppender extends AppenderSkeleton {
* @param port The port to connect on the host.
*
*/
- public Log4jAppender(String hostname, int port){
+ public Log4jAppender(String hostname, int port) {
this.hostname = hostname;
this.port = port;
}
@@ -112,14 +111,14 @@ public class Log4jAppender extends AppenderSkeleton {
* was a connection error.
*/
@Override
- public synchronized void append(LoggingEvent event) throws FlumeException{
+ public synchronized void append(LoggingEvent event) throws FlumeException {
//If rpcClient is null, it means either this appender object was never
//setup by setting hostname and port and then calling activateOptions
//or this appender object was closed by calling close(), so we throw an
//exception to show the appender is no longer accessible.
if (rpcClient == null) {
String errorMsg = "Cannot Append to Appender! Appender either closed or" +
- " not setup correctly!";
+ " not setup correctly!";
LogLog.error(errorMsg);
if (unsafeMode) {
return;
@@ -127,7 +126,7 @@ public class Log4jAppender extends AppenderSkeleton {
throw new FlumeException(errorMsg);
}
- if(!rpcClient.isActive()){
+ if (!rpcClient.isActive()) {
reconnect();
}
@@ -231,7 +230,7 @@ public class Log4jAppender extends AppenderSkeleton {
} else {
String errorMsg = "Flume log4jappender already closed!";
LogLog.error(errorMsg);
- if(unsafeMode) {
+ if (unsafeMode) {
return;
}
throw new FlumeException(errorMsg);
@@ -251,7 +250,7 @@ public class Log4jAppender extends AppenderSkeleton {
* Set the first flume hop hostname.
* @param hostname The first hop where the client should connect to.
*/
- public void setHostname(String hostname){
+ public void setHostname(String hostname) {
this.hostname = hostname;
}
@@ -259,7 +258,7 @@ public class Log4jAppender extends AppenderSkeleton {
* Set the port on the hostname to connect to.
* @param port The port to connect on the host.
*/
- public void setPort(int port){
+ public void setPort(int port) {
this.port = port;
}
@@ -299,19 +298,18 @@ public class Log4jAppender extends AppenderSkeleton {
Properties props = new Properties();
props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
- hostname + ":" + port);
+ hostname + ":" + port);
props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
- String.valueOf(timeout));
+ String.valueOf(timeout));
props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
- String.valueOf(timeout));
+ String.valueOf(timeout));
try {
rpcClient = RpcClientFactory.getInstance(props);
if (layout != null) {
layout.activateOptions();
}
} catch (FlumeException e) {
- String errormsg = "RPC client creation failed! " +
- e.getMessage();
+ String errormsg = "RPC client creation failed! " + e.getMessage();
LogLog.error(errormsg);
if (unsafeMode) {
return;
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
index b68e749..22983d3 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
@@ -30,24 +30,23 @@ public enum Log4jAvroHeaders {
AVRO_SCHEMA_URL("flume.avro.schema.url");
private String headerName;
- private Log4jAvroHeaders(String headerName){
+ private Log4jAvroHeaders(String headerName) {
this.headerName = headerName;
}
- public String getName(){
+ public String getName() {
return headerName;
}
- public String toString(){
+ public String toString() {
return getName();
}
- public static Log4jAvroHeaders getByName(String headerName){
+ public static Log4jAvroHeaders getByName(String headerName) {
Log4jAvroHeaders hdrs = null;
- try{
+ try {
hdrs = Log4jAvroHeaders.valueOf(headerName.toLowerCase(Locale.ENGLISH).trim());
- }
- catch(IllegalArgumentException e){
+ } catch (IllegalArgumentException e) {
hdrs = Log4jAvroHeaders.OTHER;
}
return hdrs;
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/Context.java b/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
index c0460d2..f00b571 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
@@ -86,7 +86,7 @@ public class Context {
Preconditions.checkArgument(prefix.endsWith("."),
"The given prefix does not end with a period (" + prefix + ")");
Map<String, String> result = Maps.newHashMap();
- synchronized(parameters) {
+ synchronized (parameters) {
for (String key : parameters.keySet()) {
if (key.startsWith(prefix)) {
String name = key.substring(prefix.length());
@@ -129,7 +129,7 @@ public class Context {
*/
public Boolean getBoolean(String key, Boolean defaultValue) {
String value = get(key);
- if(value != null) {
+ if (value != null) {
return Boolean.parseBoolean(value.trim());
}
return defaultValue;
@@ -158,7 +158,7 @@ public class Context {
*/
public Integer getInteger(String key, Integer defaultValue) {
String value = get(key);
- if(value != null) {
+ if (value != null) {
return Integer.parseInt(value.trim());
}
return defaultValue;
@@ -187,7 +187,7 @@ public class Context {
*/
public Long getLong(String key, Long defaultValue) {
String value = get(key);
- if(value != null) {
+ if (value != null) {
return Long.parseLong(value.trim());
}
return defaultValue;
@@ -227,7 +227,7 @@ public class Context {
}
private String get(String key, String defaultValue) {
String result = parameters.get(key);
- if(result != null) {
+ if (result != null) {
return result;
}
return defaultValue;
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java
index d6aa33a..9089122 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java
@@ -22,7 +22,6 @@ public final class BasicConfigurationConstants {
public static final String CONFIG_SOURCES_PREFIX = CONFIG_SOURCES + ".";
public static final String CONFIG_SOURCE_CHANNELSELECTOR_PREFIX = "selector.";
-
public static final String CONFIG_SINKS = "sinks";
public static final String CONFIG_SINKS_PREFIX = CONFIG_SINKS + ".";
public static final String CONFIG_SINK_PROCESSOR_PREFIX = "processor.";
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java
index 0e0614e..477a3e6 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java
@@ -64,7 +64,7 @@ public abstract class ComponentConfiguration {
failIfConfigured();
String confType = context.getString(
BasicConfigurationConstants.CONFIG_TYPE);
- if (confType != null && !confType.isEmpty()){
+ if (confType != null && !confType.isEmpty()) {
this.type = confType;
}
// Type can be set by child class constructors, so check if it was.
@@ -74,12 +74,12 @@ public abstract class ComponentConfiguration {
FlumeConfigurationErrorType.ATTRS_MISSING, ErrorOrWarning.ERROR));
throw new ConfigurationException(
- "Component has no type. Cannot configure. "+ componentName);
+ "Component has no type. Cannot configure. " + componentName);
}
}
protected void failIfConfigured() throws ConfigurationException {
- if (configured){
+ if (configured) {
throw new ConfigurationException("Already configured component."
+ componentName);
}
@@ -134,12 +134,13 @@ public abstract class ComponentConfiguration {
CHANNELSELECTOR("ChannelSelector");
private final String componentType;
- private ComponentType(String type){
+
+ private ComponentType(String type) {
componentType = type;
}
+
public String getComponentType() {
return componentType;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java
index 0433c9c..16860c3 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java
@@ -27,9 +27,9 @@ import org.apache.flume.conf.sink.SinkProcessorConfiguration.SinkProcessorConfig
import org.apache.flume.conf.source.SourceConfiguration.SourceConfigurationType;
public class ComponentConfigurationFactory {
+
@SuppressWarnings("unchecked")
- public static ComponentConfiguration
- create(String name, String type, ComponentType component)
+ public static ComponentConfiguration create(String name, String type, ComponentType component)
throws ConfigurationException {
Class<? extends ComponentConfiguration> confType = null;
@@ -43,7 +43,7 @@ public class ComponentConfigurationFactory {
} catch (Exception ignored) {
try {
type = type.toUpperCase(Locale.ENGLISH);
- switch(component){
+ switch (component) {
case SOURCE:
return SourceConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
.getConfiguration(name);
@@ -63,8 +63,7 @@ public class ComponentConfigurationFactory {
return new SinkGroupConfiguration(name);
default:
throw new ConfigurationException(
- "Cannot create configuration. Unknown Type specified: " +
- type);
+ "Cannot create configuration. Unknown Type specified: " + type);
}
} catch (ConfigurationException e) {
throw e;