You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/06/30 02:21:28 UTC
[02/11] flume git commit: FLUME-2937. Integrate checkstyle for
non-test classes
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index e659ada..4c8b52b 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -18,15 +18,10 @@
*/
package org.apache.flume.sink.hbase;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.flume.Channel;
@@ -52,14 +47,16 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.security.PrivilegedExceptionAction;
-
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
/**
- *
* A simple sink which reads events from a channel and writes them to HBase.
* The Hbase configuration is picked up from the first <tt>hbase-site.xml</tt>
* encountered in the classpath. This sink supports batch reading of
@@ -73,7 +70,7 @@ import java.security.PrivilegedExceptionAction;
* batch size, whichever comes first.<p>
* Other optional parameters are:<p>
* <tt>serializer:</tt> A class implementing {@link HbaseEventSerializer}.
- * An instance of
+ * An instance of
* this class will be used to write out events to hbase.<p>
* <tt>serializer.*:</tt> Passed in the configure() method to serializer
* as an object of {@link org.apache.flume.Context}.<p>
@@ -81,7 +78,7 @@ import java.security.PrivilegedExceptionAction;
* maximum number of events the sink will commit per transaction. The default
* batch size is 100 events.
* <p>
- *
+ * <p>
* <strong>Note: </strong> While this sink flushes all events in a transaction
* to HBase in one shot, Hbase does not guarantee atomic commits on multiple
* rows. So if a subset of events in a batch are written to disk by Hbase and
@@ -113,11 +110,11 @@ public class HBaseSink extends AbstractSink implements Configurable {
// Internal hooks used for unit testing.
private DebugIncrementsCallback debugIncrCallback = null;
- public HBaseSink(){
+ public HBaseSink() {
this(HBaseConfiguration.create());
}
- public HBaseSink(Configuration conf){
+ public HBaseSink(Configuration conf) {
this.config = conf;
}
@@ -129,15 +126,16 @@ public class HBaseSink extends AbstractSink implements Configurable {
}
@Override
- public void start(){
+ public void start() {
Preconditions.checkArgument(table == null, "Please call stop " +
"before calling start on an old instance.");
try {
- privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab);
+ privilegedExecutor =
+ FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab);
} catch (Exception ex) {
sinkCounter.incrementConnectionFailedCount();
throw new FlumeException("Failed to login to HBase using "
- + "provided credentials.", ex);
+ + "provided credentials.", ex);
}
try {
table = privilegedExecutor.execute(new PrivilegedExceptionAction<HTable>() {
@@ -165,16 +163,16 @@ public class HBaseSink extends AbstractSink implements Configurable {
}
})) {
throw new IOException("Table " + tableName
- + " has no such column family " + Bytes.toString(columnFamily));
+ + " has no such column family " + Bytes.toString(columnFamily));
}
} catch (Exception e) {
//Get getTableDescriptor also throws IOException, so catch the IOException
//thrown above or by the getTableDescriptor() call.
sinkCounter.incrementConnectionFailedCount();
throw new FlumeException("Error getting column family from HBase."
- + "Please verify that the table " + tableName + " and Column Family, "
- + Bytes.toString(columnFamily) + " exists in HBase, and the"
- + " current user has permissions to access that table.", e);
+ + "Please verify that the table " + tableName + " and Column Family, "
+ + Bytes.toString(columnFamily) + " exists in HBase, and the"
+ + " current user has permissions to access that table.", e);
}
super.start();
@@ -183,7 +181,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
}
@Override
- public void stop(){
+ public void stop() {
try {
if (table != null) {
table.close();
@@ -198,7 +196,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
@SuppressWarnings("unchecked")
@Override
- public void configure(Context context){
+ public void configure(Context context) {
tableName = context.getString(HBaseSinkConfigurationConstants.CONFIG_TABLE);
String cf = context.getString(
HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY);
@@ -213,48 +211,48 @@ public class HBaseSink extends AbstractSink implements Configurable {
Preconditions.checkNotNull(cf,
"Column family cannot be empty, please specify in configuration file");
//Check foe event serializer, if null set event serializer type
- if(eventSerializerType == null || eventSerializerType.isEmpty()) {
+ if (eventSerializerType == null || eventSerializerType.isEmpty()) {
eventSerializerType =
"org.apache.flume.sink.hbase.SimpleHbaseEventSerializer";
logger.info("No serializer defined, Will use default");
}
serializerContext.putAll(context.getSubProperties(
- HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX));
+ HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX));
columnFamily = cf.getBytes(Charsets.UTF_8);
try {
Class<? extends HbaseEventSerializer> clazz =
(Class<? extends HbaseEventSerializer>)
- Class.forName(eventSerializerType);
+ Class.forName(eventSerializerType);
serializer = clazz.newInstance();
serializer.configure(serializerContext);
} catch (Exception e) {
- logger.error("Could not instantiate event serializer." , e);
+ logger.error("Could not instantiate event serializer.", e);
Throwables.propagate(e);
}
kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB);
kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL);
enableWal = context.getBoolean(HBaseSinkConfigurationConstants
- .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL);
+ .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL);
logger.info("The write to WAL option is set to: " + String.valueOf(enableWal));
- if(!enableWal) {
+ if (!enableWal) {
logger.warn("HBase Sink's enableWal configuration is set to false. All " +
- "writes to HBase will have WAL disabled, and any data in the " +
- "memstore of this region in the Region Server could be lost!");
+ "writes to HBase will have WAL disabled, and any data in the " +
+ "memstore of this region in the Region Server could be lost!");
}
batchIncrements = context.getBoolean(
- HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
- HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
+ HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+ HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
if (batchIncrements) {
logger.info("Increment coalescing is enabled. Increments will be " +
- "buffered.");
+ "buffered.");
refGetFamilyMap = reflectLookupGetFamilyMap();
}
String zkQuorum = context.getString(HBaseSinkConfigurationConstants
- .ZK_QUORUM);
+ .ZK_QUORUM);
Integer port = null;
/**
* HBase allows multiple nodes in the quorum, but all need to use the
@@ -267,10 +265,10 @@ public class HBaseSink extends AbstractSink implements Configurable {
logger.info("Using ZK Quorum: " + zkQuorum);
String[] zkHosts = zkQuorum.split(",");
int length = zkHosts.length;
- for(int i = 0; i < length; i++) {
+ for (int i = 0; i < length; i++) {
String[] zkHostAndPort = zkHosts[i].split(":");
zkBuilder.append(zkHostAndPort[0].trim());
- if(i != length-1) {
+ if (i != length - 1) {
zkBuilder.append(",");
} else {
zkQuorum = zkBuilder.toString();
@@ -282,18 +280,18 @@ public class HBaseSink extends AbstractSink implements Configurable {
port = Integer.parseInt(zkHostAndPort[1].trim());
} else if (!port.equals(Integer.parseInt(zkHostAndPort[1].trim()))) {
throw new FlumeException("All Zookeeper nodes in the quorum must " +
- "use the same client port.");
+ "use the same client port.");
}
}
- if(port == null) {
+ if (port == null) {
port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
}
this.config.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
this.config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, port);
}
String hbaseZnode = context.getString(
- HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT);
- if(hbaseZnode != null && !hbaseZnode.isEmpty()) {
+ HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT);
+ if (hbaseZnode != null && !hbaseZnode.isEmpty()) {
this.config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hbaseZnode);
}
sinkCounter = new SinkCounter(this.getName());
@@ -314,7 +312,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
txn.begin();
if (serializer instanceof BatchAware) {
- ((BatchAware)serializer).onBatchStart();
+ ((BatchAware) serializer).onBatchStart();
}
long i = 0;
@@ -342,15 +340,15 @@ public class HBaseSink extends AbstractSink implements Configurable {
putEventsAndCommit(actions, incs, txn);
} catch (Throwable e) {
- try{
+ try {
txn.rollback();
} catch (Exception e2) {
logger.error("Exception in rollback. Rollback might not have been " +
- "successful." , e2);
+ "successful.", e2);
}
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
- if(e instanceof Error || e instanceof RuntimeException){
+ if (e instanceof Error || e instanceof RuntimeException) {
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
Throwables.propagate(e);
@@ -367,7 +365,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
}
private void putEventsAndCommit(final List<Row> actions,
- final List<Increment> incs, Transaction txn) throws Exception {
+ final List<Increment> incs, Transaction txn) throws Exception {
privilegedExecutor.execute(new PrivilegedExceptionAction<Void>() {
@Override
@@ -421,7 +419,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
@VisibleForTesting
static Method reflectLookupGetFamilyMap() {
Method m = null;
- String[] methodNames = { "getFamilyMapOfLongs", "getFamilyMap" };
+ String[] methodNames = {"getFamilyMapOfLongs", "getFamilyMap"};
for (String methodName : methodNames) {
try {
m = Increment.class.getMethod(methodName);
@@ -447,7 +445,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
@SuppressWarnings("unchecked")
private Map<byte[], NavigableMap<byte[], Long>> getFamilyMap(Increment inc) {
Preconditions.checkNotNull(refGetFamilyMap,
- "Increment.getFamilymap() not found");
+ "Increment.getFamilymap() not found");
Preconditions.checkNotNull(inc, "Increment required");
Map<byte[], NavigableMap<byte[], Long>> familyMap = null;
try {
@@ -466,6 +464,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
/**
* Perform "compression" on the given set of increments so that Flume sends
* the minimum possible number of RPC operations to HBase per batch.
+ *
* @param incs Input: Increment objects to coalesce.
* @return List of new Increment objects after coalescing the unique counts.
*/
@@ -478,7 +477,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
for (Increment inc : incs) {
byte[] row = inc.getRow();
Map<byte[], NavigableMap<byte[], Long>> families = getFamilyMap(inc);
- for (Map.Entry<byte[], NavigableMap<byte[],Long>> familyEntry : families.entrySet()) {
+ for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) {
byte[] family = familyEntry.getKey();
NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
@@ -491,9 +490,10 @@ public class HBaseSink extends AbstractSink implements Configurable {
// Reconstruct list of Increments per unique row/family/qualifier.
List<Increment> coalesced = Lists.newLinkedList();
- for (Map.Entry<byte[], Map<byte[],NavigableMap<byte[], Long>>> rowEntry : counters.entrySet()) {
+ for (Map.Entry<byte[], Map<byte[], NavigableMap<byte[], Long>>> rowEntry :
+ counters.entrySet()) {
byte[] row = rowEntry.getKey();
- Map <byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue();
+ Map<byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue();
Increment inc = new Increment(row);
for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) {
byte[] family = familyEntry.getKey();
@@ -513,11 +513,12 @@ public class HBaseSink extends AbstractSink implements Configurable {
/**
* Helper function for {@link #coalesceIncrements} to increment a counter
* value in the passed data structure.
- * @param counters Nested data structure containing the counters.
- * @param row Row key to increment.
- * @param family Column family to increment.
+ *
+ * @param counters Nested data structure containing the counters.
+ * @param row Row key to increment.
+ * @param family Column family to increment.
* @param qualifier Column qualifier to increment.
- * @param count Amount to increment by.
+ * @param count Amount to increment by.
*/
private void incrementCounter(
Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters,
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java
index 2c0f0e6..d4e3f84 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java
@@ -32,13 +32,12 @@ import org.apache.hadoop.hbase.client.Row;
* params required should be taken through this. Only the column family is
* passed in. The columns should exist in the table and column family
* specified in the configuration for the HbaseSink.
- *
*/
-public interface HbaseEventSerializer extends Configurable,
- ConfigurableComponent {
+public interface HbaseEventSerializer extends Configurable, ConfigurableComponent {
/**
* Initialize the event serializer.
- * @param Event to be written to HBase.
+ * @param event Event to be written to HBase
+ * @param columnFamily Column family to write to
*/
public void initialize(Event event, byte[] columnFamily);
@@ -54,10 +53,9 @@ public interface HbaseEventSerializer extends Configurable,
public List<Row> getActions();
public List<Increment> getIncrements();
+
/*
* Clean up any state. This will be called when the sink is being stopped.
*/
public void close();
-
-
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
index 7d2b8b7..8342d67 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
@@ -18,14 +18,8 @@
*/
package org.apache.flume.sink.hbase;
-import java.nio.charset.Charset;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -35,20 +29,25 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
+import java.nio.charset.Charset;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
- * An {@link HbaseEventSerializer} which parses columns based on a supplied
- * regular expression and column name list.
- *
+ * An {@link HbaseEventSerializer} which parses columns based on a supplied
+ * regular expression and column name list.
+ * <p>
* Note that if the regular expression does not return the correct number of
* groups for a particular event, or it does not correctly match an event,
* the event is silently dropped.
- *
+ * <p>
* Row keys for each event consist of a timestamp concatenated with an
* identifier which enforces uniqueness of keys across flume agents.
- *
+ * <p>
* See static constant variables for configuration options.
*/
public class RegexHbaseEventSerializer implements HbaseEventSerializer {
@@ -108,21 +107,21 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);
String[] columnNames = colNameStr.split(",");
- for (String s: columnNames) {
+ for (String s : columnNames) {
colNames.add(s.getBytes(charset));
}
//Rowkey is optional, default is -1
rowKeyIndex = context.getInteger(ROW_KEY_INDEX_CONFIG, -1);
//if row key is being used, make sure it is specified correct
- if(rowKeyIndex >=0){
- if(rowKeyIndex >= columnNames.length) {
+ if (rowKeyIndex >= 0) {
+ if (rowKeyIndex >= columnNames.length) {
throw new IllegalArgumentException(ROW_KEY_INDEX_CONFIG + " must be " +
- "less than num columns " + columnNames.length);
+ "less than num columns " + columnNames.length);
}
- if(!ROW_KEY_NAME.equalsIgnoreCase(columnNames[rowKeyIndex])) {
+ if (!ROW_KEY_NAME.equalsIgnoreCase(columnNames[rowKeyIndex])) {
throw new IllegalArgumentException("Column at " + rowKeyIndex + " must be "
- + ROW_KEY_NAME + " and is " + columnNames[rowKeyIndex]);
+ + ROW_KEY_NAME + " and is " + columnNames[rowKeyIndex]);
}
}
}
@@ -181,15 +180,15 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
}
try {
- if(rowKeyIndex < 0){
+ if (rowKeyIndex < 0) {
rowKey = getRowKey();
- }else{
+ } else {
rowKey = m.group(rowKeyIndex + 1).getBytes(Charsets.UTF_8);
}
Put put = new Put(rowKey);
for (int i = 0; i < colNames.size(); i++) {
- if(i != rowKeyIndex) {
+ if (i != rowKeyIndex) {
put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
}
}
@@ -211,5 +210,6 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
}
@Override
- public void close() { }
+ public void close() {
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
index 96095d1..3f442e8 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
@@ -18,18 +18,17 @@
*/
package org.apache.flume.sink.hbase;
-import java.util.ArrayList;
-import java.util.List;
-
+import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
-import org.hbase.async.AtomicIncrementRequest;
-import org.hbase.async.PutRequest;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
+import org.hbase.async.AtomicIncrementRequest;
+import org.hbase.async.PutRequest;
-import com.google.common.base.Charsets;
+import java.util.ArrayList;
+import java.util.List;
/**
* A simple serializer to be used with the AsyncHBaseSink
@@ -69,7 +68,7 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize
@Override
public List<PutRequest> getActions() {
List<PutRequest> actions = new ArrayList<PutRequest>();
- if(payloadColumn != null){
+ if (payloadColumn != null) {
byte[] rowKey;
try {
switch (keyType) {
@@ -89,17 +88,16 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize
PutRequest putRequest = new PutRequest(table, rowKey, cf,
payloadColumn, payload);
actions.add(putRequest);
- } catch (Exception e){
+ } catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
- public List<AtomicIncrementRequest> getIncrements(){
- List<AtomicIncrementRequest> actions = new
- ArrayList<AtomicIncrementRequest>();
- if(incrementColumn != null) {
+ public List<AtomicIncrementRequest> getIncrements() {
+ List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
+ if (incrementColumn != null) {
AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
incrementRow, cf, incrementColumn);
actions.add(inc);
@@ -119,23 +117,22 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize
String iCol = context.getString("incrementColumn", "iCol");
rowPrefix = context.getString("rowPrefix", "default");
String suffix = context.getString("suffix", "uuid");
- if(pCol != null && !pCol.isEmpty()) {
- if(suffix.equals("timestamp")){
+ if (pCol != null && !pCol.isEmpty()) {
+ if (suffix.equals("timestamp")) {
keyType = KeyType.TS;
} else if (suffix.equals("random")) {
keyType = KeyType.RANDOM;
- } else if(suffix.equals("nano")){
+ } else if (suffix.equals("nano")) {
keyType = KeyType.TSNANO;
} else {
keyType = KeyType.UUID;
}
payloadColumn = pCol.getBytes(Charsets.UTF_8);
}
- if(iCol != null && !iCol.isEmpty()) {
+ if (iCol != null && !iCol.isEmpty()) {
incrementColumn = iCol.getBytes(Charsets.UTF_8);
}
- incrementRow =
- context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
+ incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
}
@Override
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
index 758252b..dc89fd7 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
@@ -19,9 +19,7 @@
package org.apache.flume.sink.hbase;
-import java.util.LinkedList;
-import java.util.List;
-
+import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
@@ -30,19 +28,18 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
-import com.google.common.base.Charsets;
+import java.util.LinkedList;
+import java.util.List;
/**
* A simple serializer that returns puts from an event, by writing the event
* body into it. The headers are discarded. It also updates a row in hbase
* which acts as an event counter.
- *
- * Takes optional parameters:<p>
+ * <p>Takes optional parameters:<p>
* <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p>
* <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p>
* <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p>
- *
- * Mandatory parameters: <p>
+ * <p>Mandatory parameters: <p>
* <tt>cf:</tt>Column family.<p>
* Components that have no defaults and will not be used if null:
* <tt>payloadColumn:</tt> Which column to put payload in. If it is null,
@@ -59,8 +56,7 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer {
private KeyType keyType;
private byte[] payload;
- public SimpleHbaseEventSerializer(){
-
+ public SimpleHbaseEventSerializer() {
}
@Override
@@ -70,21 +66,21 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer {
context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
String suffix = context.getString("suffix", "uuid");
- String payloadColumn = context.getString("payloadColumn","pCol");
- String incColumn = context.getString("incrementColumn","iCol");
- if(payloadColumn != null && !payloadColumn.isEmpty()) {
- if(suffix.equals("timestamp")){
+ String payloadColumn = context.getString("payloadColumn", "pCol");
+ String incColumn = context.getString("incrementColumn", "iCol");
+ if (payloadColumn != null && !payloadColumn.isEmpty()) {
+ if (suffix.equals("timestamp")) {
keyType = KeyType.TS;
} else if (suffix.equals("random")) {
keyType = KeyType.RANDOM;
- } else if(suffix.equals("nano")){
+ } else if (suffix.equals("nano")) {
keyType = KeyType.TSNANO;
} else {
keyType = KeyType.UUID;
}
plCol = payloadColumn.getBytes(Charsets.UTF_8);
}
- if(incColumn != null && !incColumn.isEmpty()) {
+ if (incColumn != null && !incColumn.isEmpty()) {
incCol = incColumn.getBytes(Charsets.UTF_8);
}
}
@@ -102,14 +98,14 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer {
@Override
public List<Row> getActions() throws FlumeException {
List<Row> actions = new LinkedList<Row>();
- if(plCol != null){
+ if (plCol != null) {
byte[] rowKey;
try {
if (keyType == KeyType.TS) {
rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);
- } else if(keyType == KeyType.RANDOM) {
+ } else if (keyType == KeyType.RANDOM) {
rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);
- } else if(keyType == KeyType.TSNANO) {
+ } else if (keyType == KeyType.TSNANO) {
rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);
} else {
rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);
@@ -117,33 +113,34 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer {
Put put = new Put(rowKey);
put.add(cf, plCol, payload);
actions.add(put);
- } catch (Exception e){
+ } catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
- @Override
- public List<Increment> getIncrements(){
- List<Increment> incs = new LinkedList<Increment>();
- if(incCol != null) {
- Increment inc = new Increment(incrementRow);
- inc.addColumn(cf, incCol, 1);
- incs.add(inc);
- }
- return incs;
- }
- @Override
- public void close() {
+ @Override
+ public List<Increment> getIncrements() {
+ List<Increment> incs = new LinkedList<Increment>();
+ if (incCol != null) {
+ Increment inc = new Increment(incrementRow);
+ inc.addColumn(cf, incCol, 1);
+ incs.add(inc);
}
+ return incs;
+ }
- public enum KeyType{
- UUID,
- RANDOM,
- TS,
- TSNANO;
- }
+ @Override
+ public void close() {
+ }
+ public enum KeyType {
+ UUID,
+ RANDOM,
+ TS,
+ TSNANO;
}
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java
index b25eb6a..2d654f2 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java
@@ -25,28 +25,22 @@ import java.util.UUID;
/**
* Utility class for users to generate their own keys. Any key can be used,
* this is just a utility that provides a set of simple keys.
- *
- *
*/
public class SimpleRowKeyGenerator {
- public static byte[] getUUIDKey(String prefix)
- throws UnsupportedEncodingException{
+ public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException {
return (prefix + UUID.randomUUID().toString()).getBytes("UTF8");
}
- public static byte[] getRandomKey(String prefix)
- throws UnsupportedEncodingException{
+ public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException {
return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8");
}
- public static byte[] getTimestampKey(String prefix)
- throws UnsupportedEncodingException {
- return (prefix + String.valueOf(
- System.currentTimeMillis())).getBytes("UTF8");
+
+ public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException {
+ return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
}
- public static byte[] getNanoTimestampKey(String prefix)
- throws UnsupportedEncodingException{
- return (prefix + String.valueOf(
- System.nanoTime())).getBytes("UTF8");
+
+ public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException {
+ return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8");
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 7bef7f3..9453546 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -157,7 +157,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
if (event == null) {
// no events available in channel
- if(processedEvents == 0) {
+ if (processedEvents == 0) {
result = Status.BACKOFF;
counter.incrementBatchEmptyCount();
} else {
@@ -177,7 +177,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
if (logger.isDebugEnabled()) {
logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
- + new String(eventBody, "UTF-8"));
+ + new String(eventBody, "UTF-8"));
logger.debug("event #{}", processedEvents);
}
@@ -185,8 +185,10 @@ public class KafkaSink extends AbstractSink implements Configurable {
long startTime = System.currentTimeMillis();
try {
- kafkaFutures.add(producer.send(new ProducerRecord<String, byte[]> (eventTopic, eventKey, serializeEvent(event, useAvroEventFormat)),
- new SinkCallback(startTime)));
+ kafkaFutures.add(producer.send(
+ new ProducerRecord<String, byte[]>(eventTopic, eventKey,
+ serializeEvent(event, useAvroEventFormat)),
+ new SinkCallback(startTime)));
} catch (IOException ex) {
throw new EventDeliveryException("Could not serialize event", ex);
}
@@ -197,11 +199,11 @@ public class KafkaSink extends AbstractSink implements Configurable {
// publish batch and commit.
if (processedEvents > 0) {
- for (Future<RecordMetadata> future : kafkaFutures) {
- future.get();
- }
+ for (Future<RecordMetadata> future : kafkaFutures) {
+ future.get();
+ }
long endTime = System.nanoTime();
- counter.addToKafkaEventSendTimer((endTime-batchStartTime)/(1000*1000));
+ counter.addToKafkaEventSendTimer((endTime - batchStartTime) / (1000 * 1000));
counter.addToEventDrainSuccessCount(Long.valueOf(kafkaFutures.size()));
}
@@ -270,8 +272,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
if (topicStr == null || topicStr.isEmpty()) {
topicStr = DEFAULT_TOPIC;
logger.warn("Topic was not specified. Using {} as the topic.", topicStr);
- }
- else {
+ } else {
logger.info("Using the static topic {}. This may be overridden by event headers", topicStr);
}
@@ -283,7 +284,8 @@ public class KafkaSink extends AbstractSink implements Configurable {
logger.debug("Using batch size: {}", batchSize);
}
- useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT, KafkaSinkConstants.DEFAULT_AVRO_EVENT);
+ useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT,
+ KafkaSinkConstants.DEFAULT_AVRO_EVENT);
if (logger.isDebugEnabled()) {
logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat);
@@ -322,7 +324,8 @@ public class KafkaSink extends AbstractSink implements Configurable {
throw new ConfigurationException("Bootstrap Servers must be specified");
} else {
ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
- logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
+ logger.warn("{} is deprecated. Please use the parameter {}",
+ BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
}
}
@@ -348,21 +351,18 @@ public class KafkaSink extends AbstractSink implements Configurable {
if (ctx.containsKey(KEY_SERIALIZER_KEY )) {
logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
- "a different interface for serializers. Please use the parameter {}",
- KEY_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ "a different interface for serializers. Please use the parameter {}",
+ KEY_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if (ctx.containsKey(MESSAGE_SERIALIZER_KEY)) {
logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
- "a different interface for serializers. Please use the parameter {}",
- MESSAGE_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ "a different interface for serializers. Please use the parameter {}",
+ MESSAGE_SERIALIZER_KEY,
+ KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
-
-
-
-
-
}
+
private void setProducerProps(Context context, String bootStrapServers) {
kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
//Defaults overridden based on config
@@ -387,7 +387,8 @@ public class KafkaSink extends AbstractSink implements Configurable {
writer = Optional.of(new SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
}
tempOutStream.get().reset();
- AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody()));
+ AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()),
+ ByteBuffer.wrap(event.getBody()));
encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream.get(), encoder);
writer.get().write(e, encoder);
encoder.flush();
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 6b64bc1..1bf380c 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -29,7 +29,8 @@ public class KafkaSinkConstants {
public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic";
public static final String BATCH_SIZE = "flumeBatchSize";
- public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+ public static final String BOOTSTRAP_SERVERS_CONFIG =
+ KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
public static final String KEY_HEADER = "key";
public static final String TOPIC_HEADER = "topic";
@@ -37,25 +38,23 @@ public class KafkaSinkConstants {
public static final String AVRO_EVENT = "useFlumeEventFormat";
public static final boolean DEFAULT_AVRO_EVENT = false;
- public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
- public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
+ public static final String DEFAULT_KEY_SERIALIZER =
+ "org.apache.kafka.common.serialization.StringSerializer";
+ public static final String DEFAULT_VALUE_SERIAIZER =
+ "org.apache.kafka.common.serialization.ByteArraySerializer";
public static final int DEFAULT_BATCH_SIZE = 100;
public static final String DEFAULT_TOPIC = "default-flume-topic";
public static final String DEFAULT_ACKS = "1";
-
/* Old Properties */
- /* Properties */
+ /* Properties */
public static final String OLD_BATCH_SIZE = "batchSize";
public static final String MESSAGE_SERIALIZER_KEY = "serializer.class";
public static final String KEY_SERIALIZER_KEY = "key.serializer.class";
public static final String BROKER_LIST_FLUME_KEY = "brokerList";
public static final String REQUIRED_ACKS_FLUME_KEY = "requiredAcks";
-
-
-
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
index 12bdc40..095f889 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
@@ -84,7 +84,8 @@ public class BlobDeserializer implements EventDeserializer {
blob.write(buf, 0, n);
blobLength += n;
if (blobLength >= maxBlobLength) {
- LOGGER.warn("File length exceeds maxBlobLength ({}), truncating BLOB event!", maxBlobLength);
+ LOGGER.warn("File length exceeds maxBlobLength ({}), truncating BLOB event!",
+ maxBlobLength);
break;
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
index e84dec1..ca7614a 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
@@ -87,7 +87,8 @@ public class BlobHandler implements HTTPSourceHandler {
blob.write(buf, 0, n);
blobLength += n;
if (blobLength >= maxBlobLength) {
- LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating BLOB event!", maxBlobLength);
+ LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating BLOB event!",
+ maxBlobLength);
break;
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
index d3154af..d877814 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
@@ -97,8 +97,10 @@ public class MorphlineHandlerImpl implements MorphlineHandler {
.build();
}
- Config override = ConfigFactory.parseMap(context.getSubProperties(MORPHLINE_VARIABLE_PARAM + "."));
- morphline = new Compiler().compile(new File(morphlineFile), morphlineId, morphlineContext, finalChild, override);
+ Config override = ConfigFactory.parseMap(
+ context.getSubProperties(MORPHLINE_VARIABLE_PARAM + "."));
+ morphline = new Compiler().compile(
+ new File(morphlineFile), morphlineId, morphlineContext, finalChild, override);
this.mappingTimer = morphlineContext.getMetricRegistry().timer(
MetricRegistry.name("morphline.app", Metrics.ELAPSED_TIME));
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
index ef8f716..3b94133 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
@@ -47,12 +47,13 @@ import com.google.common.io.ByteStreams;
public class MorphlineInterceptor implements Interceptor {
private final Context context;
- private final Queue<LocalMorphlineInterceptor> pool = new ConcurrentLinkedQueue<LocalMorphlineInterceptor>();
+ private final Queue<LocalMorphlineInterceptor> pool = new ConcurrentLinkedQueue<>();
protected MorphlineInterceptor(Context context) {
Preconditions.checkNotNull(context);
this.context = context;
- returnToPool(new LocalMorphlineInterceptor(context)); // fail fast on morphline compilation exception
+ // fail fast on morphline compilation exception
+ returnToPool(new LocalMorphlineInterceptor(context));
}
@Override
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
index 9c4dc25..f7a73f3 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
@@ -160,15 +160,15 @@ public class MorphlineSink extends AbstractSink implements Configurable {
return numEventsTaken == 0 ? Status.BACKOFF : Status.READY;
} catch (Throwable t) {
// Ooops - need to rollback and back off
- LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " + myChannel.getName()
- + ". Exception follows.", t);
+ LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " +
+ myChannel.getName() + ". Exception follows.", t);
try {
if (!isMorphlineTransactionCommitted) {
handler.rollbackTransaction();
}
} catch (Throwable t2) {
- LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback morphline transaction. " +
- "Exception follows.", t2);
+ LOGGER.error("Morphline Sink " + getName() +
+ ": Unable to rollback morphline transaction. Exception follows.", t2);
} finally {
try {
txn.rollback();
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java
index 6b327ce..acb5118 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java
@@ -57,15 +57,17 @@ import org.apache.flume.event.SimpleEvent;
public class DefaultJMSMessageConverter implements JMSMessageConverter {
private final Charset charset;
+
private DefaultJMSMessageConverter(String charset) {
this.charset = Charset.forName(charset);
}
+
public static class Builder implements JMSMessageConverter.Builder {
@Override
public JMSMessageConverter build(Context context) {
- return new DefaultJMSMessageConverter(context.
- getString(JMSSourceConfiguration.CONVERTER_CHARSET,
- JMSSourceConfiguration.CONVERTER_CHARSET_DEFAULT).trim());
+ return new DefaultJMSMessageConverter(context.getString(
+ JMSSourceConfiguration.CONVERTER_CHARSET,
+ JMSSourceConfiguration.CONVERTER_CHARSET_DEFAULT).trim());
}
}
@@ -75,52 +77,52 @@ public class DefaultJMSMessageConverter implements JMSMessageConverter {
Map<String, String> headers = event.getHeaders();
@SuppressWarnings("rawtypes")
Enumeration propertyNames = message.getPropertyNames();
- while(propertyNames.hasMoreElements()) {
+ while (propertyNames.hasMoreElements()) {
String name = propertyNames.nextElement().toString();
String value = message.getStringProperty(name);
headers.put(name, value);
}
- if(message instanceof BytesMessage) {
+ if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage)message;
long length = bytesMessage.getBodyLength();
- if(length > 0L) {
+ if (length > 0L) {
if (length > Integer.MAX_VALUE) {
throw new JMSException("Unable to process message " + "of size "
+ length);
}
byte[] body = new byte[(int)length];
int count = bytesMessage.readBytes(body);
- if(count != length) {
+ if (count != length) {
throw new JMSException("Unable to read full message. " +
"Read " + count + " of total " + length);
}
event.setBody(body);
}
- } else if(message instanceof TextMessage) {
+ } else if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage)message;
event.setBody(textMessage.getText().getBytes(charset));
- } else if(message instanceof ObjectMessage) {
+ } else if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage)message;
Object object = objectMessage.getObject();
- if(object != null) {
+ if (object != null) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
try {
out = new ObjectOutputStream(bos);
out.writeObject(object);
event.setBody(bos.toByteArray());
- } catch(IOException e) {
+ } catch (IOException e) {
throw new FlumeException("Error serializing object", e);
} finally {
try {
- if(out != null) {
+ if (out != null) {
out.close();
}
} catch (IOException e) {
throw new FlumeException("Error closing ObjectOutputStream", e);
}
try {
- if(bos != null) {
+ if (bos != null) {
bos.close();
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java
index 2f0220a..8874dd1 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java
@@ -22,7 +22,6 @@ import java.util.Properties;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-
public class InitialContextFactory {
public InitialContext create(Properties properties) throws NamingException {
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
index 7a9461b..6b3a1cf 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
@@ -18,8 +18,12 @@
*/
package org.apache.flume.source.jms;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -30,14 +34,8 @@ import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
class JMSMessageConsumer {
private static final Logger logger = LoggerFactory
@@ -52,11 +50,11 @@ class JMSMessageConsumer {
private final Destination destination;
private final MessageConsumer messageConsumer;
- JMSMessageConsumer(InitialContext initialContext, ConnectionFactory connectionFactory, String destinationName,
- JMSDestinationLocator destinationLocator, JMSDestinationType destinationType,
- String messageSelector, int batchSize, long pollTimeout,
- JMSMessageConverter messageConverter,
- Optional<String> userName, Optional<String> password) {
+ JMSMessageConsumer(InitialContext initialContext, ConnectionFactory connectionFactory,
+ String destinationName, JMSDestinationLocator destinationLocator,
+ JMSDestinationType destinationType, String messageSelector, int batchSize,
+ long pollTimeout, JMSMessageConverter messageConverter,
+ Optional<String> userName, Optional<String> password) {
this.batchSize = batchSize;
this.pollTimeout = pollTimeout;
this.messageConverter = messageConverter;
@@ -65,7 +63,7 @@ class JMSMessageConsumer {
Preconditions.checkArgument(pollTimeout >= 0, "Poll timeout cannot be " +
"negative");
try {
- if(userName.isPresent()) {
+ if (userName.isPresent()) {
connection = connectionFactory.createConnection(userName.get(),
password.get());
} else {
@@ -82,37 +80,37 @@ class JMSMessageConsumer {
throw new FlumeException("Could not create session", e);
}
- try {
- if (destinationLocator.equals(JMSDestinationLocator.CDI)) {
- switch (destinationType) {
- case QUEUE:
- destination = session.createQueue(destinationName);
- break;
- case TOPIC:
- destination = session.createTopic(destinationName);
- break;
- default:
- throw new IllegalStateException(String.valueOf(destinationType));
+ try {
+ if (destinationLocator.equals(JMSDestinationLocator.CDI)) {
+ switch (destinationType) {
+ case QUEUE:
+ destination = session.createQueue(destinationName);
+ break;
+ case TOPIC:
+ destination = session.createTopic(destinationName);
+ break;
+ default:
+ throw new IllegalStateException(String.valueOf(destinationType));
+ }
+ } else {
+ destination = (Destination) initialContext.lookup(destinationName);
}
- } else {
- destination = (Destination) initialContext.lookup(destinationName);
+ } catch (JMSException e) {
+ throw new FlumeException("Could not create destination " + destinationName, e);
+ } catch (NamingException e) {
+ throw new FlumeException("Could not find destination " + destinationName, e);
}
- } catch (JMSException e) {
- throw new FlumeException("Could not create destination " + destinationName, e);
- } catch (NamingException e) {
- throw new FlumeException("Could not find destination " + destinationName, e);
- }
- try {
+ try {
messageConsumer = session.createConsumer(destination,
- messageSelector.isEmpty() ? null: messageSelector);
+ messageSelector.isEmpty() ? null : messageSelector);
} catch (JMSException e) {
throw new FlumeException("Could not create consumer", e);
}
String startupMsg = String.format("Connected to '%s' of type '%s' with " +
- "user '%s', batch size '%d', selector '%s' ", destinationName,
+ "user '%s', batch size '%d', selector '%s' ", destinationName,
destinationType, userName.isPresent() ? userName.get() : "null",
- batchSize, messageSelector.isEmpty() ? null : messageSelector);
+ batchSize, messageSelector.isEmpty() ? null : messageSelector);
logger.info(startupMsg);
}
@@ -120,23 +118,23 @@ class JMSMessageConsumer {
List<Event> result = new ArrayList<Event>(batchSize);
Message message;
message = messageConsumer.receive(pollTimeout);
- if(message != null) {
+ if (message != null) {
result.addAll(messageConverter.convert(message));
int max = batchSize - 1;
for (int i = 0; i < max; i++) {
message = messageConsumer.receiveNoWait();
- if(message == null) {
+ if (message == null) {
break;
}
result.addAll(messageConverter.convert(message));
}
}
- if(logger.isDebugEnabled()) {
- logger.debug(String.format("Took batch of %s from %s", result.size(),
- destination));
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("Took batch of %s from %s", result.size(), destination));
}
return result;
}
+
void commit() {
try {
session.commit();
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
index af74bf4..9747a31 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
@@ -22,15 +22,15 @@ import javax.naming.InitialContext;
import com.google.common.base.Optional;
-
public class JMSMessageConsumerFactory {
JMSMessageConsumer create(InitialContext initialContext, ConnectionFactory connectionFactory,
- String destinationName, JMSDestinationType destinationType, JMSDestinationLocator destinationLocator,
- String messageSelector, int batchSize, long pollTimeout, JMSMessageConverter messageConverter,
- Optional<String> userName, Optional<String> password) {
+ String destinationName, JMSDestinationType destinationType,
+ JMSDestinationLocator destinationLocator, String messageSelector, int batchSize,
+ long pollTimeout, JMSMessageConverter messageConverter,
+ Optional<String> userName, Optional<String> password) {
return new JMSMessageConsumer(initialContext, connectionFactory, destinationName,
- destinationLocator, destinationType, messageSelector, batchSize, pollTimeout,
+ destinationLocator, destinationType, messageSelector, batchSize, pollTimeout,
messageConverter, userName, password);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
index c1cc9cf..7631827 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
@@ -76,40 +76,39 @@ public class JMSSource extends AbstractPollableSource {
private int jmsExceptionCounter;
private InitialContext initialContext;
-
public JMSSource() {
this(new JMSMessageConsumerFactory(), new InitialContextFactory());
}
+
@VisibleForTesting
- public JMSSource(JMSMessageConsumerFactory consumerFactory, InitialContextFactory initialContextFactory) {
+ public JMSSource(JMSMessageConsumerFactory consumerFactory,
+ InitialContextFactory initialContextFactory) {
super();
this.consumerFactory = consumerFactory;
this.initialContextFactory = initialContextFactory;
-
}
@Override
protected void doConfigure(Context context) throws FlumeException {
sourceCounter = new SourceCounter(getName());
- initialContextFactoryName = context.getString(JMSSourceConfiguration.
- INITIAL_CONTEXT_FACTORY, "").trim();
+ initialContextFactoryName = context.getString(
+ JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, "").trim();
- providerUrl = context.getString(JMSSourceConfiguration.PROVIDER_URL, "")
- .trim();
+ providerUrl = context.getString(JMSSourceConfiguration.PROVIDER_URL, "").trim();
- destinationName = context.getString(JMSSourceConfiguration.
- DESTINATION_NAME, "").trim();
+ destinationName = context.getString(JMSSourceConfiguration.DESTINATION_NAME, "").trim();
- String destinationTypeName = context.getString(JMSSourceConfiguration.
- DESTINATION_TYPE, "").trim().toUpperCase(Locale.ENGLISH);
+ String destinationTypeName = context.getString(
+ JMSSourceConfiguration.DESTINATION_TYPE, "").trim().toUpperCase(Locale.ENGLISH);
- String destinationLocatorName = context.getString(JMSSourceConfiguration.
- DESTINATION_LOCATOR, JMSSourceConfiguration.DESTINATION_LOCATOR_DEFAULT)
- .trim().toUpperCase(Locale.ENGLISH);
+ String destinationLocatorName = context.getString(
+ JMSSourceConfiguration.DESTINATION_LOCATOR,
+ JMSSourceConfiguration.DESTINATION_LOCATOR_DEFAULT)
+ .trim().toUpperCase(Locale.ENGLISH);
- messageSelector = context.getString(JMSSourceConfiguration.
- MESSAGE_SELECTOR, "").trim();
+ messageSelector = context.getString(
+ JMSSourceConfiguration.MESSAGE_SELECTOR, "").trim();
batchSize = context.getInteger(JMSSourceConfiguration.BATCH_SIZE,
JMSSourceConfiguration.BATCH_SIZE_DEFAULT);
@@ -117,16 +116,14 @@ public class JMSSource extends AbstractPollableSource {
errorThreshold = context.getInteger(JMSSourceConfiguration.ERROR_THRESHOLD,
JMSSourceConfiguration.ERROR_THRESHOLD_DEFAULT);
- userName = Optional.fromNullable(context.getString(JMSSourceConfiguration.
- USERNAME));
+ userName = Optional.fromNullable(context.getString(JMSSourceConfiguration.USERNAME));
pollTimeout = context.getLong(JMSSourceConfiguration.POLL_TIMEOUT,
JMSSourceConfiguration.POLL_TIMEOUT_DEFAULT);
- String passwordFile = context.getString(JMSSourceConfiguration.
- PASSWORD_FILE, "").trim();
+ String passwordFile = context.getString(JMSSourceConfiguration.PASSWORD_FILE, "").trim();
- if(passwordFile.isEmpty()) {
+ if (passwordFile.isEmpty()) {
password = Optional.of("");
} else {
try {
@@ -140,45 +137,38 @@ public class JMSSource extends AbstractPollableSource {
String converterClassName = context.getString(
JMSSourceConfiguration.CONVERTER_TYPE,
- JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT)
- .trim();
- if(JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT.
- equalsIgnoreCase(converterClassName)) {
+ JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT).trim();
+ if (JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT.equalsIgnoreCase(converterClassName)) {
converterClassName = DefaultJMSMessageConverter.Builder.class.getName();
}
- Context converterContext = new Context(context.
- getSubProperties(JMSSourceConfiguration.CONVERTER + "."));
+ Context converterContext = new Context(context.getSubProperties(
+ JMSSourceConfiguration.CONVERTER + "."));
try {
@SuppressWarnings("rawtypes")
Class clazz = Class.forName(converterClassName);
boolean isBuilder = JMSMessageConverter.Builder.class
.isAssignableFrom(clazz);
- if(isBuilder) {
- JMSMessageConverter.Builder builder = (JMSMessageConverter.Builder)
- clazz.newInstance();
+ if (isBuilder) {
+ JMSMessageConverter.Builder builder = (JMSMessageConverter.Builder)clazz.newInstance();
converter = builder.build(converterContext);
} else {
- Preconditions.checkState(JMSMessageConverter.class.
- isAssignableFrom(clazz), String.
- format("Class %s is not a subclass of JMSMessageConverter",
- clazz.getName()));
+ Preconditions.checkState(JMSMessageConverter.class.isAssignableFrom(clazz),
+ String.format("Class %s is not a subclass of JMSMessageConverter", clazz.getName()));
converter = (JMSMessageConverter)clazz.newInstance();
- boolean configured = Configurables.configure(converter,
- converterContext);
- if(logger.isDebugEnabled()) {
- logger.debug(String.
- format("Attempted configuration of %s, result = %s",
- converterClassName, String.valueOf(configured)));
+ boolean configured = Configurables.configure(converter, converterContext);
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("Attempted configuration of %s, result = %s",
+ converterClassName, String.valueOf(configured)));
}
}
- } catch(Exception e) {
+ } catch (Exception e) {
throw new FlumeException(String.format(
"Unable to create instance of converter %s", converterClassName), e);
}
- String connectionFactoryName = context.getString(JMSSourceConfiguration.
- CONNECTION_FACTORY, JMSSourceConfiguration.CONNECTION_FACTORY_DEFAULT)
- .trim();
+ String connectionFactoryName = context.getString(
+ JMSSourceConfiguration.CONNECTION_FACTORY,
+ JMSSourceConfiguration.CONNECTION_FACTORY_DEFAULT).trim();
assertNotEmpty(initialContextFactoryName, String.format(
"Initial Context Factory is empty. This is specified by %s",
@@ -210,8 +200,7 @@ public class JMSSource extends AbstractPollableSource {
"invalid.", destinationLocatorName), e);
}
- Preconditions.checkArgument(batchSize > 0, "Batch size must be greater " +
- "than 0");
+ Preconditions.checkArgument(batchSize > 0, "Batch size must be greater than 0");
try {
Properties contextProperties = new Properties();
@@ -223,12 +212,12 @@ public class JMSSource extends AbstractPollableSource {
// Provide properties for connecting via JNDI
if (this.userName.isPresent()) {
- contextProperties.setProperty(
- javax.naming.Context.SECURITY_PRINCIPAL, this.userName.get());
+ contextProperties.setProperty(javax.naming.Context.SECURITY_PRINCIPAL,
+ this.userName.get());
}
if (this.password.isPresent()) {
- contextProperties.setProperty(
- javax.naming.Context.SECURITY_CREDENTIALS, this.password.get());
+ contextProperties.setProperty(javax.naming.Context.SECURITY_CREDENTIALS,
+ this.password.get());
}
initialContext = initialContextFactory.create(contextProperties);
@@ -239,28 +228,26 @@ public class JMSSource extends AbstractPollableSource {
}
try {
- connectionFactory = (ConnectionFactory) initialContext.
- lookup(connectionFactoryName);
+ connectionFactory = (ConnectionFactory) initialContext.lookup(connectionFactoryName);
} catch (NamingException e) {
throw new FlumeException("Could not lookup ConnectionFactory", e);
}
}
private void assertNotEmpty(String arg, String msg) {
- Preconditions.checkArgument(!arg.isEmpty(),
- msg);
+ Preconditions.checkArgument(!arg.isEmpty(), msg);
}
@Override
protected synchronized Status doProcess() throws EventDeliveryException {
boolean error = true;
try {
- if(consumer == null) {
+ if (consumer == null) {
consumer = createConsumer();
}
List<Event> events = consumer.take();
int size = events.size();
- if(size == 0) {
+ if (size == 0) {
error = false;
return Status.BACKOFF;
}
@@ -275,28 +262,28 @@ public class JMSSource extends AbstractPollableSource {
logger.warn("Error appending event to channel. "
+ "Channel might be full. Consider increasing the channel "
+ "capacity or make sure the sinks perform faster.", channelException);
- } catch(JMSException jmsException) {
+ } catch (JMSException jmsException) {
logger.warn("JMSException consuming events", jmsException);
- if(++jmsExceptionCounter > errorThreshold) {
- if(consumer != null) {
+ if (++jmsExceptionCounter > errorThreshold) {
+ if (consumer != null) {
logger.warn("Exceeded JMSException threshold, closing consumer");
consumer.rollback();
consumer.close();
consumer = null;
}
}
- } catch(Throwable throwable) {
+ } catch (Throwable throwable) {
logger.error("Unexpected error processing events", throwable);
- if(throwable instanceof Error) {
+ if (throwable instanceof Error) {
throw (Error) throwable;
}
} finally {
- if(error) {
- if(consumer != null) {
+ if (error) {
+ if (consumer != null) {
consumer.rollback();
}
} else {
- if(consumer != null) {
+ if (consumer != null) {
consumer.commit();
jmsExceptionCounter = 0;
}
@@ -304,6 +291,7 @@ public class JMSSource extends AbstractPollableSource {
}
return Status.BACKOFF;
}
+
@Override
protected synchronized void doStart() {
try {
@@ -317,18 +305,18 @@ public class JMSSource extends AbstractPollableSource {
@Override
protected synchronized void doStop() {
- if(consumer != null) {
+ if (consumer != null) {
consumer.close();
consumer = null;
}
sourceCounter.stop();
}
+
private JMSMessageConsumer createConsumer() throws JMSException {
logger.info("Creating new consumer for " + destinationName);
JMSMessageConsumer consumer = consumerFactory.create(initialContext,
- connectionFactory, destinationName, destinationType, destinationLocator,
- messageSelector, batchSize,
- pollTimeout, converter, userName, password);
+ connectionFactory, destinationName, destinationType, destinationLocator,
+ messageSelector, batchSize, pollTimeout, converter, userName, password);
jmsExceptionCounter = 0;
return consumer;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 84fef52..90e4715 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -112,18 +112,24 @@ public class KafkaSource extends AbstractPollableSource
*/
public abstract class Subscriber<T> {
public abstract void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener);
- public T get() {return null;}
+
+ public T get() {
+ return null;
+ }
}
private class TopicListSubscriber extends Subscriber<List<String>> {
private List<String> topicList;
+
public TopicListSubscriber(String commaSeparatedTopics) {
this.topicList = Arrays.asList(commaSeparatedTopics.split("^\\s+|\\s*,\\s*|\\s+$"));
}
+
@Override
public void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener) {
consumer.subscribe(topicList, listener);
}
+
@Override
public List<String> get() {
return topicList;
@@ -132,13 +138,16 @@ public class KafkaSource extends AbstractPollableSource
private class PatternSubscriber extends Subscriber<Pattern> {
private Pattern pattern;
+
public PatternSubscriber(String regex) {
this.pattern = Pattern.compile(regex);
}
+
@Override
public void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener) {
consumer.subscribe(pattern, listener);
}
+
@Override
public Pattern get() {
return pattern;
@@ -232,10 +241,11 @@ public class KafkaSource extends AbstractPollableSource
}
if (log.isDebugEnabled()) {
- log.debug("Topic: {} Partition: {} Message: {}", new String[]{
- message.topic(),
- String.valueOf(message.partition()),
- new String(eventBody)});
+ log.debug("Topic: {} Partition: {} Message: {}", new String[] {
+ message.topic(),
+ String.valueOf(message.partition()),
+ new String(eventBody)
+ });
}
event = EventBuilder.withBody(eventBody, headers);
@@ -305,21 +315,21 @@ public class KafkaSource extends AbstractPollableSource
if (topicProperty != null && !topicProperty.isEmpty()) {
// create subscriber that uses pattern-based subscription
subscriber = new PatternSubscriber(topicProperty);
- } else
- if((topicProperty = context.getString(KafkaSourceConstants.TOPICS)) != null && !topicProperty.isEmpty()) {
+ } else if ((topicProperty = context.getString(KafkaSourceConstants.TOPICS)) != null &&
+ !topicProperty.isEmpty()) {
// create subscriber that uses topic list subscription
subscriber = new TopicListSubscriber(topicProperty);
- } else
- if (subscriber == null) {
+ } else if (subscriber == null) {
throw new ConfigurationException("At least one Kafka topic must be specified.");
}
batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE,
- KafkaSourceConstants.DEFAULT_BATCH_SIZE);
+ KafkaSourceConstants.DEFAULT_BATCH_SIZE);
maxBatchDurationMillis = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS,
- KafkaSourceConstants.DEFAULT_BATCH_DURATION);
+ KafkaSourceConstants.DEFAULT_BATCH_DURATION);
- useAvroEventFormat = context.getBoolean(KafkaSourceConstants.AVRO_EVENT, KafkaSourceConstants.DEFAULT_AVRO_EVENT);
+ useAvroEventFormat = context.getBoolean(KafkaSourceConstants.AVRO_EVENT,
+ KafkaSourceConstants.DEFAULT_AVRO_EVENT);
if (log.isDebugEnabled()) {
log.debug(KafkaSourceConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat);
@@ -337,7 +347,6 @@ public class KafkaSource extends AbstractPollableSource
}
}
-
// We can remove this once the properties are officially deprecated
private void translateOldProperties(Context ctx) {
// topic
@@ -358,16 +367,18 @@ public class KafkaSource extends AbstractPollableSource
}
}
-
private void setConsumerProps(Context ctx, String bootStrapServers) {
- String groupId = ctx.getString(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+ String groupId = ctx.getString(
+ KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
if ((groupId == null || groupId.isEmpty()) &&
- kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
- groupId = KafkaSourceConstants.DEFAULT_GROUP_ID;
- log.info("Group ID was not specified. Using " + groupId + " as the group id.");
+ kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+ groupId = KafkaSourceConstants.DEFAULT_GROUP_ID;
+ log.info("Group ID was not specified. Using " + groupId + " as the group id.");
}
- kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
- kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER);
+ kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
+ kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER);
//Defaults overridden based on config
kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX));
//These always take precedence over config
@@ -375,7 +386,8 @@ public class KafkaSource extends AbstractPollableSource
if (groupId != null) {
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
- kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
+ kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
log.info(kafkaProps.toString());
}
@@ -426,7 +438,6 @@ public class KafkaSource extends AbstractPollableSource
}
}
-
class SourceRebalanceListener implements ConsumerRebalanceListener {
private static final Logger log = LoggerFactory.getLogger(SourceRebalanceListener.class);
private AtomicBoolean rebalanceFlag;
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 9f20f61..1f255f9 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -22,9 +22,12 @@ public class KafkaSourceConstants {
public static final String KAFKA_PREFIX = "kafka.";
public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
- public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
- public static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
- public static final String BOOTSTRAP_SERVERS = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+ public static final String DEFAULT_KEY_DESERIALIZER =
+ "org.apache.kafka.common.serialization.StringDeserializer";
+ public static final String DEFAULT_VALUE_DESERIALIZER =
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer";
+ public static final String BOOTSTRAP_SERVERS =
+ KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
public static final String TOPICS = KAFKA_PREFIX + "topics";
public static final String TOPICS_REGEX = TOPICS + "." + "regex";
public static final String DEFAULT_AUTO_COMMIT = "false";
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
index 8128df4..1409f25 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
@@ -19,6 +19,20 @@
package org.apache.flume.source.taildir;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+import com.google.gson.stream.JsonReader;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.client.avro.ReliableEventReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
@@ -29,21 +43,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.flume.annotations.InterfaceAudience;
-import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.client.avro.ReliableEventReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Table;
-import com.google.gson.stream.JsonReader;
-
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReliableTaildirEventReader implements ReliableEventReader {
@@ -111,15 +110,15 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
jr.beginObject();
while (jr.hasNext()) {
switch (jr.nextName()) {
- case "inode":
- inode = jr.nextLong();
- break;
- case "pos":
- pos = jr.nextLong();
- break;
- case "file":
- path = jr.nextString();
- break;
+ case "inode":
+ inode = jr.nextLong();
+ break;
+ case "pos":
+ pos = jr.nextLong();
+ break;
+ case "file":
+ path = jr.nextString();
+ break;
}
}
jr.endObject();
@@ -238,7 +237,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
long startPos = skipToEnd ? f.length() : 0;
tf = openFile(f, headers, inode, startPos);
- } else{
+ } else {
boolean updated = tf.getLastUpdated() < f.lastModified();
if (updated) {
if (tf.getRaf() == null) {
@@ -320,7 +319,8 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
}
public ReliableTaildirEventReader build() throws IOException {
- return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, addByteOffset, cachePatternMatching);
+ return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd,
+ addByteOffset, cachePatternMatching);
}
}