You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2012/07/07 00:30:12 UTC
svn commit: r1358458 - in /incubator/flume/branches/branch-1.2.0: conf/
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/
flume-ng-core/src/main/java/org/apache/flume/channel/
flume-ng-core/src/main/java/org/apache/flume...
Author: mpercy
Date: Fri Jul 6 22:30:11 2012
New Revision: 1358458
URL: http://svn.apache.org/viewvc?rev=1358458&view=rev
Log:
FLUME-748. Create metric collection infrastructure.
(Arvind Prabhakar via Mike Percy)
Added:
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelProcessorCounter.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkProcessorCounter.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java
Modified:
incubator/flume/branches/branch-1.2.0/conf/flume-env.sh.template
incubator/flume/branches/branch-1.2.0/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
Modified: incubator/flume/branches/branch-1.2.0/conf/flume-env.sh.template
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/conf/flume-env.sh.template?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/conf/flume-env.sh.template (original)
+++ incubator/flume/branches/branch-1.2.0/conf/flume-env.sh.template Fri Jul 6 22:30:11 2012
@@ -21,8 +21,8 @@
#JAVA_HOME=/usr/lib/jvm/java-6-sun
-# Give Flume more memory and pre-allocate
-#JAVA_OPTS="-Xms100m -Xmx200m"
+# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
+#JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"
# Note that the Flume conf directory is always included in the classpath.
#FLUME_CLASSPATH=""
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java Fri Jul 6 22:30:11 2012
@@ -34,6 +34,7 @@ import org.apache.flume.Event;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.channel.file.Log.Builder;
+import org.apache.flume.instrumentation.ChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,6 +85,7 @@ public class FileChannel extends BasicCh
private int logWriteTimeout;
private int checkpointWriteTimeout;
private String channelNameDescriptor = "[channel=unknown]";
+ private ChannelCounter channelCounter;
@Override
public synchronized void setName(String name) {
@@ -202,6 +204,10 @@ public class FileChannel extends BasicCh
log.setCheckpointInterval(checkpointInterval);
log.setMaxFileSize(maxFileSize);
}
+
+ if (channelCounter == null) {
+ channelCounter = new ChannelCounter(getName());
+ }
}
@Override
@@ -231,13 +237,22 @@ public class FileChannel extends BasicCh
open = false;
LOG.error("Failed to start the file channel", ex);
}
+ if (open) {
+ channelCounter.start();
+ channelCounter.setChannelSize(getDepth());
+ }
super.start();
}
@Override
public synchronized void stop() {
LOG.info("Stopping {}...", this);
+ int size = getDepth();
close();
+ if (!open) {
+ channelCounter.setChannelSize(size);
+ channelCounter.stop();
+ }
super.stop();
}
@@ -256,7 +271,8 @@ public class FileChannel extends BasicCh
trans.getStateAsString() + channelNameDescriptor);
}
trans = new FileBackedTransaction(log, TRANSACTION_ID.incrementAndGet(),
- transactionCapacity, keepAlive, queueRemaining, getName());
+ transactionCapacity, keepAlive, queueRemaining, getName(),
+ channelCounter);
transactions.set(trans);
return trans;
}
@@ -294,9 +310,10 @@ public class FileChannel extends BasicCh
private final FlumeEventQueue queue;
private final Semaphore queueRemaining;
private final String channelNameDescriptor;
+ private final ChannelCounter channelCounter;
public FileBackedTransaction(Log log, long transactionID,
int transCapacity, int keepAlive, Semaphore queueRemaining,
- String name) {
+ String name, ChannelCounter counter) {
this.log = log;
queue = log.getFlumeEventQueue();
this.transactionID = transactionID;
@@ -305,6 +322,7 @@ public class FileChannel extends BasicCh
putList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
takeList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
channelNameDescriptor = "[channel=" + name + "]";
+ this.channelCounter = counter;
}
private boolean isClosed() {
return State.CLOSED.equals(getState());
@@ -314,6 +332,7 @@ public class FileChannel extends BasicCh
}
@Override
protected void doPut(Event event) throws InterruptedException {
+ channelCounter.incrementEventPutAttemptCount();
if(putList.remainingCapacity() == 0) {
throw new ChannelException("Put queue for FileBackedTransaction " +
"of capacity " + putList.size() + " full, consider " +
@@ -336,6 +355,7 @@ public class FileChannel extends BasicCh
@Override
protected Event doTake() throws InterruptedException {
+ channelCounter.incrementEventTakeAttemptCount();
if(takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for FileBackedTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
@@ -384,6 +404,7 @@ public class FileChannel extends BasicCh
}
try {
log.commitPut(transactionID);
+ channelCounter.addToEventPutSuccessCount(puts);
} catch (IOException e) {
throw new ChannelException("Commit failed due to IO error "
+ channelNameDescriptor, e);
@@ -391,6 +412,7 @@ public class FileChannel extends BasicCh
} else if(takes > 0) {
try {
log.commitTake(transactionID);
+ channelCounter.addToEventTakeSuccessCount(takes);
} catch (IOException e) {
throw new ChannelException("Commit failed due to IO error "
+ channelNameDescriptor, e);
@@ -399,6 +421,7 @@ public class FileChannel extends BasicCh
}
putList.clear();
takeList.clear();
+ channelCounter.setChannelSize(queue.getSize());
}
@Override
@@ -423,6 +446,7 @@ public class FileChannel extends BasicCh
}
putList.clear();
takeList.clear();
+ channelCounter.setChannelSize(queue.getSize());
}
}
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Fri Jul 6 22:30:11 2012
@@ -24,9 +24,10 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
+import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
-import org.apache.flume.ChannelException;
+import org.apache.flume.instrumentation.ChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,14 +42,18 @@ public class MemoryChannel extends Basic
public class MemoryTransaction extends BasicTransactionSemantics {
private LinkedBlockingDeque<Event> takeList;
private LinkedBlockingDeque<Event> putList;
+ private final ChannelCounter channelCounter;
- public MemoryTransaction(int transCapacity) {
+ public MemoryTransaction(int transCapacity, ChannelCounter counter) {
putList = new LinkedBlockingDeque<Event>(transCapacity);
takeList = new LinkedBlockingDeque<Event>(transCapacity);
+
+ channelCounter = counter;
}
@Override
protected void doPut(Event event) {
+ channelCounter.incrementEventPutAttemptCount();
if(!putList.offer(event)) {
throw new ChannelException("Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
@@ -58,6 +63,7 @@ public class MemoryChannel extends Basic
@Override
protected Event doTake() throws InterruptedException {
+ channelCounter.incrementEventTakeAttemptCount();
if(takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
@@ -87,6 +93,7 @@ public class MemoryChannel extends Basic
}
}
int puts = putList.size();
+ int takes = takeList.size();
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
@@ -102,7 +109,14 @@ public class MemoryChannel extends Basic
if(remainingChange > 0) {
queueRemaining.release(remainingChange);
}
+ if (puts > 0) {
+ channelCounter.addToEventPutSuccessCount(puts);
+ }
+ if (takes > 0) {
+ channelCounter.addToEventTakeSuccessCount(takes);
+ }
+ channelCounter.setChannelSize(queue.size());
}
@Override
@@ -117,6 +131,7 @@ public class MemoryChannel extends Basic
putList.clear();
}
queueStored.release(takes);
+ channelCounter.setChannelSize(queue.size());
}
}
@@ -140,6 +155,7 @@ public class MemoryChannel extends Basic
// maximum items in a transaction queue
private volatile Integer transCapacity;
private volatile int keepAlive;
+ private ChannelCounter channelCounter;
public MemoryChannel() {
@@ -193,6 +209,10 @@ public class MemoryChannel extends Basic
queueStored = new Semaphore(0);
}
}
+
+ if (channelCounter == null) {
+ channelCounter = new ChannelCounter(getName());
+ }
}
private void resizeQueue(int capacity) throws InterruptedException {
@@ -224,7 +244,21 @@ public class MemoryChannel extends Basic
}
@Override
+ public synchronized void start() {
+ channelCounter.start();
+ channelCounter.setChannelSize(queue.size());
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ channelCounter.setChannelSize(queue.size());
+ channelCounter.stop();
+ super.stop();
+ }
+
+ @Override
protected BasicTransactionSemantics createTransaction() {
- return new MemoryTransaction(transCapacity);
+ return new MemoryTransaction(transCapacity, channelCounter);
}
}
Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java Fri Jul 6 22:30:11 2012
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public class ChannelCounter extends MonitoredCounterGroup implements
+ ChannelCounterMBean {
+
+ private static final String COUNTER_CHANNEL_SIZE = "channel.current.size";
+
+ private static final String COUNTER_EVENT_PUT_ATTEMPT =
+ "channel.event.put.attempt";
+
+ private static final String COUNTER_EVENT_TAKE_ATTEMPT =
+ "channel.event.take.attempt";
+
+ private static final String COUNTER_EVENT_PUT_SUCCESS =
+ "channel.event.put.success";
+
+ private static final String COUNTER_EVENT_TAKE_SUCCESS =
+ "channel.event.take.success";
+
+ private static final String[] ATTRIBUTES = {
+ COUNTER_CHANNEL_SIZE, COUNTER_EVENT_PUT_ATTEMPT,
+ COUNTER_EVENT_TAKE_ATTEMPT, COUNTER_EVENT_PUT_SUCCESS,
+ COUNTER_EVENT_TAKE_SUCCESS
+ };
+
+ public ChannelCounter(String name) {
+ super(MonitoredCounterGroup.Type.CHANNEL, name, ATTRIBUTES);
+ }
+
+ @Override
+ public long getChannelSize() {
+ return get(COUNTER_CHANNEL_SIZE);
+ }
+
+ public void setChannelSize(long newSize) {
+ set(COUNTER_CHANNEL_SIZE, newSize);
+ }
+
+ @Override
+ public long getEventPutAttemptCount() {
+ return get(COUNTER_EVENT_PUT_ATTEMPT);
+ }
+
+ public long incrementEventPutAttemptCount() {
+ return increment(COUNTER_EVENT_PUT_ATTEMPT);
+ }
+
+ @Override
+ public long getEventTakeAttemptCount() {
+ return get(COUNTER_EVENT_TAKE_ATTEMPT);
+ }
+
+ public long incrementEventTakeAttemptCount() {
+ return increment(COUNTER_EVENT_TAKE_ATTEMPT);
+ }
+
+ @Override
+ public long getEventPutSuccessCount() {
+ return get(COUNTER_EVENT_PUT_SUCCESS);
+ }
+
+ public long addToEventPutSuccessCount(long delta) {
+ return addAndGet(COUNTER_EVENT_PUT_SUCCESS, delta);
+ }
+
+ @Override
+ public long getEventTakeSuccessCount() {
+ return get(COUNTER_EVENT_TAKE_SUCCESS);
+ }
+
+ public long addToEventTakeSuccessCount(long delta) {
+ return addAndGet(COUNTER_EVENT_TAKE_SUCCESS, delta);
+ }
+}
Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java Fri Jul 6 22:30:11 2012
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public interface ChannelCounterMBean {
+
+ long getChannelSize();
+
+ long getEventPutAttemptCount();
+
+ long getEventTakeAttemptCount();
+
+ long getEventPutSuccessCount();
+
+ long getEventTakeSuccessCount();
+
+ long getStartTime();
+
+ long getStopTime();
+}
Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelProcessorCounter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelProcessorCounter.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelProcessorCounter.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelProcessorCounter.java Fri Jul 6 22:30:11 2012
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public class ChannelProcessorCounter extends MonitoredCounterGroup {
+
+ protected ChannelProcessorCounter(String name) {
+ super(MonitoredCounterGroup.Type.CHANNEL_PROCESSOR, name);
+ }
+
+}
Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java Fri Jul 6 22:30:11 2012
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class MonitoredCounterGroup {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MonitoredCounterGroup.class);
+
+ private final Type type;
+ private final String name;
+ private final Map<String, AtomicLong> counterMap;
+
+ private AtomicLong startTime;
+ private AtomicLong stopTime;
+
+
+ protected MonitoredCounterGroup(Type type, String name, String... attrs) {
+ this.type = type;
+ this.name = name;
+
+ Map<String, AtomicLong> counterInitMap = new HashMap<String, AtomicLong>();
+
+ // Initialize the counters
+ for (String attribute : attrs) {
+ counterInitMap.put(attribute, new AtomicLong(0L));
+ }
+
+ counterMap = Collections.unmodifiableMap(counterInitMap);
+
+ startTime = new AtomicLong(0L);
+ stopTime = new AtomicLong(0L);
+
+ try {
+ ObjectName objName = new ObjectName("org.apache.flume."
+ + type.name().toLowerCase() + ":type=" + this.name);
+
+ ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName);
+
+ LOG.info("Monitoried counter group for type: " + type + ", name: " + name
+ + ", registered successfully.");
+ } catch (Exception ex) {
+ LOG.error("Failed to register monitored counter group for type: "
+ + type + ", name: " + name, ex);
+ }
+ }
+
+ public void start() {
+ stopTime.set(0L);
+ for (String counter : counterMap.keySet()) {
+ counterMap.get(counter).set(0L);
+ }
+ startTime.set(System.currentTimeMillis());
+ LOG.info("Component type: " + type + ", name: " + name + " started");
+ }
+
+ public void stop() {
+ stopTime.set(System.currentTimeMillis());
+ LOG.info("Component type: " + type + ", name: " + name + " stopped");
+ }
+
+ public long getStartTime() {
+ return startTime.get();
+ }
+
+ public long getStopTime() {
+ return stopTime.get();
+ }
+
+ @Override
+ public final String toString() {
+ StringBuilder sb = new StringBuilder(type.name()).append(":");
+ sb.append(name).append("{");
+ boolean first = true;
+ Iterator<String> counterIterator = counterMap.keySet().iterator();
+ while (counterIterator.hasNext()) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ String counterName = counterIterator.next();
+ sb.append(counterName).append("=").append(get(counterName));
+ }
+ sb.append("}");
+
+ return sb.toString();
+ }
+
+
+ protected long get(String counter) {
+ return counterMap.get(counter).get();
+ }
+
+ protected void set(String counter, long value) {
+ counterMap.get(counter).set(value);
+ }
+
+ protected long addAndGet(String counter, long delta) {
+ return counterMap.get(counter).addAndGet(delta);
+ }
+
+ protected long increment(String counter) {
+ return counterMap.get(counter).incrementAndGet();
+ }
+
+ public static enum Type {
+ SOURCE,
+ CHANNEL_PROCESSOR,
+ CHANNEL,
+ SINK_PROCESSOR,
+ SINK
+ };
+}
Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java Fri Jul 6 22:30:11 2012
@@ -0,0 +1,138 @@
+/*
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public class SinkCounter extends MonitoredCounterGroup implements
+ SinkCounterMBean {
+
+ private static final String COUNTER_CONNECTION_CREATED =
+ "sink.connection.creation.count";
+
+ private static final String COUNTER_CONNECTION_CLOSED =
+ "sink.connection.closed.count";
+
+ private static final String COUNTER_CONNECTION_FAILED =
+ "sink.connection.failed.count";
+
+ private static final String COUNTER_BATCH_EMPTY =
+ "sink.batch.empty";
+
+ private static final String COUNTER_BATCH_UNDERFLOW =
+ "sink.batch.underflow";
+
+ private static final String COUNTER_BATCH_COMPLETE =
+ "sink.batch.complete";
+
+ private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
+ "sink.event.drain.attempt";
+
+ private static final String COUNTER_EVENT_DRAIN_SUCCESS =
+ "sink.event.drain.sucess";
+
+ private static final String[] ATTRIBUTES = {
+ COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
+ COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
+ COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
+ COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
+ };
+
+
+ public SinkCounter(String name) {
+ super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
+ }
+
+ @Override
+ public long getConnectionCreatedCount() {
+ return get(COUNTER_CONNECTION_CREATED);
+ }
+
+ public long incrementConnectionCreatedCount() {
+ return increment(COUNTER_CONNECTION_CREATED);
+ }
+
+ @Override
+ public long getConnectionClosedCount() {
+ return get(COUNTER_CONNECTION_CLOSED);
+ }
+
+ public long incrementConnectionClosedCount() {
+ return increment(COUNTER_CONNECTION_CLOSED);
+ }
+
+ @Override
+ public long getConnectionFailedCount() {
+ return get(COUNTER_CONNECTION_FAILED);
+ }
+
+ public long incrementConnectionFailedCount() {
+ return increment(COUNTER_CONNECTION_FAILED);
+ }
+
+ @Override
+ public long getBatchEmptyCount() {
+ return get(COUNTER_BATCH_EMPTY);
+ }
+
+ public long incrementBatchEmptyCount() {
+ return increment(COUNTER_BATCH_EMPTY);
+ }
+
+ @Override
+ public long getBatchUnderflowCount() {
+ return get(COUNTER_BATCH_UNDERFLOW);
+ }
+
+ public long incrementBatchUnderflowCount() {
+ return increment(COUNTER_BATCH_UNDERFLOW);
+ }
+
+ @Override
+ public long getBatchCompleteCount() {
+ return get(COUNTER_BATCH_COMPLETE);
+ }
+
+ public long incrementBatchCompleteCount() {
+ return increment(COUNTER_BATCH_COMPLETE);
+ }
+
+ @Override
+ public long getEventDrainAttemptCount() {
+ return get(COUNTER_EVENT_DRAIN_ATTEMPT);
+ }
+
+ public long incrementEventDrainAttemptCount() {
+ return increment(COUNTER_EVENT_DRAIN_ATTEMPT);
+ }
+
+ public long addToEventDrainAttemptCount(long delta) {
+ return addAndGet(COUNTER_EVENT_DRAIN_ATTEMPT, delta);
+ }
+
+ @Override
+ public long getEventDrainSuccessCount() {
+ return get(COUNTER_EVENT_DRAIN_SUCCESS);
+ }
+
+ public long incrementEventDrainSuccessCount() {
+ return increment(COUNTER_EVENT_DRAIN_SUCCESS);
+ }
+
+ public long addToEventDrainSuccessCount(long delta) {
+ return addAndGet(COUNTER_EVENT_DRAIN_SUCCESS, delta);
+ }
+}
Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java Fri Jul 6 22:30:11 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public interface SinkCounterMBean {
+
+ long getConnectionCreatedCount();
+
+ long getConnectionClosedCount();
+
+ long getConnectionFailedCount();
+
+ long getBatchEmptyCount();
+
+ long getBatchUnderflowCount();
+
+ long getBatchCompleteCount();
+
+ long getEventDrainAttemptCount();
+
+ long getEventDrainSuccessCount();
+
+ long getStartTime();
+
+ long getStopTime();
+}
Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkProcessorCounter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkProcessorCounter.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkProcessorCounter.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkProcessorCounter.java Fri Jul 6 22:30:11 2012
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public class SinkProcessorCounter extends MonitoredCounterGroup {
+
+ protected SinkProcessorCounter(String name) {
+ super(MonitoredCounterGroup.Type.SINK_PROCESSOR, name);
+ }
+
+}
Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java Fri Jul 6 22:30:11 2012
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public class SourceCounter extends MonitoredCounterGroup implements
+ SourceCounterMBean {
+
+ private static final String COUNTER_EVENTS_RECEIVED =
+ "src.events.received";
+ private static final String COUNTER_EVENTS_ACCEPTED =
+ "src.events.accepted";
+
+ private static final String COUNTER_APPEND_RECEIVED =
+ "src.append.received";
+ private static final String COUNTER_APPEND_ACCEPTED =
+ "src.append.accepted";
+
+ private static final String COUNTER_APPEND_BATCH_RECEIVED =
+ "src.append-batch.received";
+ private static final String COUNTER_APPEND_BATCH_ACCEPTED =
+ "src.append-batch.accepted";
+
+
+ private static final String[] ATTRIBUTES =
+ {
+ COUNTER_EVENTS_RECEIVED, COUNTER_EVENTS_ACCEPTED,
+ COUNTER_APPEND_RECEIVED, COUNTER_APPEND_ACCEPTED,
+ COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED
+ };
+
+
+ public SourceCounter(String name) {
+ super(MonitoredCounterGroup.Type.SOURCE, name, ATTRIBUTES);
+ }
+
+ @Override
+ public long getEventReceivedCount() {
+ return get(COUNTER_EVENTS_RECEIVED);
+ }
+
+ public long incrementEventReceivedCount() {
+ return increment(COUNTER_EVENTS_RECEIVED);
+ }
+
+ public long addToEventReceivedCount(long delta) {
+ return addAndGet(COUNTER_EVENTS_RECEIVED, delta);
+ }
+
+ @Override
+ public long getEventAcceptedCount() {
+ return get(COUNTER_EVENTS_ACCEPTED);
+ }
+
+ public long incrementEventAcceptedCount() {
+ return increment(COUNTER_EVENTS_ACCEPTED);
+ }
+
+ public long addToEventAcceptedCount(long delta) {
+ return addAndGet(COUNTER_EVENTS_ACCEPTED, delta);
+ }
+
+ @Override
+ public long getAppendReceivedCount() {
+ return get(COUNTER_APPEND_RECEIVED);
+ }
+
+ public long incrementAppendReceivedCount() {
+ return increment(COUNTER_APPEND_RECEIVED);
+ }
+
+ @Override
+ public long getAppendAcceptedCount() {
+ return get(COUNTER_APPEND_ACCEPTED);
+ }
+
+ public long incrementAppendAcceptedCount() {
+ return increment(COUNTER_APPEND_ACCEPTED);
+ }
+
+ @Override
+ public long getAppendBatchReceivedCount() {
+ return get(COUNTER_APPEND_BATCH_RECEIVED);
+ }
+
+ public long incrementAppendBatchReceivedCount() {
+ return increment(COUNTER_APPEND_BATCH_RECEIVED);
+ }
+
+ @Override
+ public long getAppendBatchAcceptedCount() {
+ return get(COUNTER_APPEND_BATCH_ACCEPTED);
+ }
+
+ public long incrementAppendBatchAcceptedCount() {
+ return increment(COUNTER_APPEND_BATCH_ACCEPTED);
+ }
+}
Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java Fri Jul 6 22:30:11 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public interface SourceCounterMBean {
+
+ long getEventReceivedCount();
+
+ long getEventAcceptedCount();
+
+ long getAppendReceivedCount();
+
+ long getAppendAcceptedCount();
+
+ long getAppendBatchReceivedCount();
+
+ long getAppendBatchAcceptedCount();
+
+ long getStartTime();
+
+ long getStopTime();
+}
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Fri Jul 6 22:30:11 2012
@@ -20,27 +20,27 @@
package org.apache.flume.sink;
import java.util.List;
+import java.util.Properties;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.source.AvroSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import java.util.Properties;
-import org.apache.flume.api.RpcClientConfigurationConstants;
/**
* <p>
@@ -125,12 +125,8 @@ public class AvroSink extends AbstractSi
private Integer port;
private RpcClient client;
- private CounterGroup counterGroup;
private Properties clientProps;
-
- public AvroSink() {
- counterGroup = new CounterGroup();
- }
+ private SinkCounter sinkCounter;
@Override
public void configure(Context context) {
@@ -165,6 +161,10 @@ public class AvroSink extends AbstractSi
RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
String.valueOf(requestTimeout));
}
+
+ if (sinkCounter == null) {
+ sinkCounter = new SinkCounter(getName());
+ }
}
/**
@@ -178,8 +178,16 @@ public class AvroSink extends AbstractSi
logger.info("Avro sink {}: Building RpcClient with hostname: {}, " +
"port: {}",
new Object[] { getName(), hostname, port });
-
+ try {
client = RpcClientFactory.getInstance(clientProps);
+ } catch (Exception ex) {
+ sinkCounter.incrementConnectionFailedCount();
+ if (ex instanceof FlumeException) {
+ throw (FlumeException) ex;
+ } else {
+ throw new FlumeException(ex);
+ }
+ }
logger.debug("Avro sink {}: Created RpcClient: {}", getName(), client);
}
@@ -190,7 +198,9 @@ public class AvroSink extends AbstractSi
logger.debug("Avro sink {} closing avro client: {}", getName(), client);
try {
client.close();
+ sinkCounter.incrementConnectionClosedCount();
} catch (FlumeException e) {
+ sinkCounter.incrementConnectionFailedCount();
logger.error("Avro sink " + getName() + ": Attempt to close avro " +
"client failed. Exception follows.", e);
}
@@ -226,7 +236,7 @@ public class AvroSink extends AbstractSi
@Override
public void start() {
logger.info("Starting {}...", this);
-
+ sinkCounter.start();
try {
createConnection();
} catch (FlumeException e) {
@@ -247,10 +257,10 @@ public class AvroSink extends AbstractSi
logger.info("Avro sink {} stopping...", getName());
destroyConnection();
-
+ sinkCounter.stop();
super.stop();
- logger.info("Avro sink {} stopped. Metrics: {}", getName(), counterGroup);
+ logger.info("Avro sink {} stopped. Metrics: {}", getName(), sinkCounter);
}
@Override
@@ -276,26 +286,33 @@ public class AvroSink extends AbstractSi
Event event = channel.take();
if (event == null) {
- counterGroup.incrementAndGet("batch.underflow");
break;
}
batch.add(event);
}
- if (batch.isEmpty()) {
- counterGroup.incrementAndGet("batch.empty");
+ int size = batch.size();
+ int batchSize = client.getBatchSize();
+
+ if (size == 0) {
+ sinkCounter.incrementBatchEmptyCount();
status = Status.BACKOFF;
} else {
+ if (size < batchSize) {
+ sinkCounter.incrementBatchUnderflowCount();
+ } else {
+ sinkCounter.incrementBatchCompleteCount();
+ }
+ sinkCounter.addToEventDrainAttemptCount(size);
client.appendBatch(batch);
}
transaction.commit();
- counterGroup.incrementAndGet("batch.success");
+ sinkCounter.addToEventDrainSuccessCount(size);
} catch (Throwable t) {
transaction.rollback();
- counterGroup.incrementAndGet("batch.failure");
if (t instanceof Error) {
throw (Error) t;
} else if (t instanceof ChannelException) {
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Fri Jul 6 22:30:11 2012
@@ -33,12 +33,12 @@ import org.apache.avro.ipc.specific.Spec
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.Source;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
@@ -113,14 +113,10 @@ public class AvroSource extends Abstract
private String bindAddress;
private Server server;
- private CounterGroup counterGroup;
+ private SourceCounter sourceCounter;
private int maxThreads;
- public AvroSource() {
- counterGroup = new CounterGroup();
- }
-
@Override
public void configure(Context context) {
port = Integer.parseInt(context.getString("port"));
@@ -131,6 +127,10 @@ public class AvroSource extends Abstract
logger.warn("AVRO source\'s \"threads\" property must specify an integer value.",
context.getString(THREADS));
}
+
+ if (sourceCounter == null) {
+ sourceCounter = new SourceCounter(getName());
+ }
}
@Override
@@ -149,7 +149,7 @@ public class AvroSource extends Abstract
}
server.start();
-
+ sourceCounter.start();
super.start();
logger.info("Avro source {} started.", getName());
@@ -167,10 +167,11 @@ public class AvroSource extends Abstract
logger.info("Avro source " + getName() + ": Interrupted while waiting " +
"for Avro server to stop. Exiting. Exception follows.", e);
}
-
+ sourceCounter.stop();
super.stop();
- logger.info("Avro source {} stopped. Metrics: {}", getName(), counterGroup);
+ logger.info("Avro source {} stopped. Metrics: {}", getName(),
+ sourceCounter);
}
@Override
@@ -194,8 +195,10 @@ public class AvroSource extends Abstract
@Override
public Status append(AvroFlumeEvent avroEvent) {
- logger.debug("Avro source {}: Received avro event: {}", getName(), avroEvent);
- counterGroup.incrementAndGet("rpc.received");
+ logger.debug("Avro source {}: Received avro event: {}", getName(),
+ avroEvent);
+ sourceCounter.incrementAppendReceivedCount();
+ sourceCounter.incrementEventReceivedCount();
Event event = EventBuilder.withBody(avroEvent.getBody().array(),
toStringMap(avroEvent.getHeaders()));
@@ -208,7 +211,8 @@ public class AvroSource extends Abstract
return Status.FAILED;
}
- counterGroup.incrementAndGet("rpc.successful");
+ sourceCounter.incrementAppendAcceptedCount();
+ sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
@@ -217,14 +221,14 @@ public class AvroSource extends Abstract
public Status appendBatch(List<AvroFlumeEvent> events) {
logger.debug("Avro source {}: Received avro event batch of {} events.",
getName(), events.size());
- counterGroup.incrementAndGet("rpc.batch.received");
+ sourceCounter.incrementAppendBatchReceivedCount();
+ sourceCounter.addToEventReceivedCount(events.size());
List<Event> batch = new ArrayList<Event>();
for (AvroFlumeEvent avroEvent : events) {
Event event = EventBuilder.withBody(avroEvent.getBody().array(),
toStringMap(avroEvent.getHeaders()));
- counterGroup.incrementAndGet("rpc.batch.events");
batch.add(event);
}
@@ -237,7 +241,8 @@ public class AvroSource extends Abstract
return Status.FAILED;
}
- counterGroup.incrementAndGet("rpc.batch.successful");
+ sourceCounter.incrementAppendBatchAcceptedCount();
+ sourceCounter.addToEventAcceptedCount(events.size());
return Status.OK;
}
Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java Fri Jul 6 22:30:11 2012
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+import java.lang.management.ManagementFactory;
+import java.util.Random;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import junit.framework.Assert;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMonitoredCounterGroup {
+
+ private static final int MAX_BOUNDS = 1000;
+ private static final String ROOT_OBJ_NAME_PREFIX = "org.apache.flume.";
+ private static final String SOURCE_OBJ_NAME_PREFIX = ROOT_OBJ_NAME_PREFIX
+ + "source:type=";
+ private static final String CHANNEL_OBJ_NAME_PREFIX = ROOT_OBJ_NAME_PREFIX
+ + "channel:type=";
+ private static final String SINK_OBJ_NAME_PREFIX = ROOT_OBJ_NAME_PREFIX
+ + "sink:type=";
+
+ private static final String ATTR_START_TIME = "StartTime";
+ private static final String ATTR_STOP_TIME = "StopTime";
+
+ private static final String SRC_ATTR_EVENT_RECEVIED_COUNT =
+ "EventReceivedCount";
+ private static final String SRC_ATTR_EVENT_ACCEPTED_COUNT =
+ "EventAcceptedCount";
+ private static final String SRC_ATTR_APPEND_RECEVIED_COUNT =
+ "AppendReceivedCount";
+ private static final String SRC_ATTR_APPEND_ACCEPTED_COUNT =
+ "AppendAcceptedCount";
+ private static final String SRC_ATTR_APPEND_BATCH_RECEVIED_COUNT =
+ "AppendBatchReceivedCount";
+ private static final String SRC_ATTR_APPEND_BATCH_ACCEPTED_COUNT =
+ "AppendBatchAcceptedCount";
+
+
+ private static final String CH_ATTR_CHANNEL_SIZE = "ChannelSize";
+ private static final String CH_ATTR_EVENT_PUT_ATTEMPT =
+ "EventPutAttemptCount";
+ private static final String CH_ATTR_EVENT_TAKE_ATTEMPT =
+ "EventTakeAttemptCount";
+ private static final String CH_ATTR_EVENT_PUT_SUCCESS =
+ "EventPutSuccessCount";
+ private static final String CH_ATTR_EVENT_TAKE_SUCCESS =
+ "EventTakeSuccessCount";
+
+ private static final String SK_ATTR_CONN_CREATED =
+ "ConnectionCreatedCount";
+ private static final String SK_ATTR_CONN_CLOSED =
+ "ConnectionClosedCount";
+ private static final String SK_ATTR_CONN_FAILED =
+ "ConnectionFailedCount";
+ private static final String SK_ATTR_BATCH_EMPTY =
+ "BatchEmptyCount";
+ private static final String SK_ATTR_BATCH_UNDERFLOW =
+ "BatchUnderflowCount";
+ private static final String SK_ATTR_BATCH_COMPLETE =
+ "BatchCompleteCount";
+ private static final String SK_ATTR_EVENT_DRAIN_ATTEMPT =
+ "EventDrainAttemptCount";
+ private static final String SK_ATTR_EVENT_DRAIN_SUCCESS =
+ "EventDrainSuccessCount";
+
+ private MBeanServer mbServer;
+ private Random random;
+
+ @Before
+ public void setUp() {
+ mbServer = ManagementFactory.getPlatformMBeanServer();
+ random = new Random(System.nanoTime());
+ }
+
+ @Test
+ public void testSinkCounter() throws Exception {
+ String name = getRandomName();
+
+ SinkCounter skc = new SinkCounter(name);
+ ObjectName on = new ObjectName(SINK_OBJ_NAME_PREFIX + name);
+ assertSkCounterState(on, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
+
+ skc.start();
+ long start1 = getStartTime(on);
+
+ Assert.assertTrue("StartTime", start1 != 0L);
+ Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+ int connCreated = random.nextInt(MAX_BOUNDS);
+ int connClosed = random.nextInt(MAX_BOUNDS);
+ int connFailed = random.nextInt(MAX_BOUNDS);
+ int batchEmpty = random.nextInt(MAX_BOUNDS);
+ int batchUnderflow = random.nextInt(MAX_BOUNDS);
+ int batchComplete = random.nextInt(MAX_BOUNDS);
+ int eventDrainAttempt = random.nextInt(MAX_BOUNDS);
+ int eventDrainSuccess = random.nextInt(MAX_BOUNDS);
+
+ for (int i = 0; i<connCreated; i++) {
+ skc.incrementConnectionCreatedCount();
+ }
+ for (int i = 0; i<connClosed; i++) {
+ skc.incrementConnectionClosedCount();
+ }
+ for (int i = 0; i<connFailed; i++) {
+ skc.incrementConnectionFailedCount();
+ }
+ for (int i = 0; i<batchEmpty; i++) {
+ skc.incrementBatchEmptyCount();
+ }
+ for (int i = 0; i<batchUnderflow; i++) {
+ skc.incrementBatchUnderflowCount();
+ }
+ for (int i = 0; i<batchComplete; i++) {
+ skc.incrementBatchCompleteCount();
+ }
+ for (int i = 0; i<eventDrainAttempt; i++) {
+ skc.incrementEventDrainAttemptCount();
+ }
+ for (int i = 0; i<eventDrainSuccess; i++) {
+ skc.incrementEventDrainSuccessCount();
+ }
+
+ assertSkCounterState(on, connCreated, connClosed, connFailed, batchEmpty,
+ batchUnderflow, batchComplete, eventDrainAttempt, eventDrainSuccess);
+
+ skc.stop();
+
+ Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+ Assert.assertTrue("StopTime", getStopTime(on) != 0L);
+
+ assertSkCounterState(on, connCreated, connClosed, connFailed, batchEmpty,
+ batchUnderflow, batchComplete, eventDrainAttempt, eventDrainSuccess);
+
+ skc.start();
+ Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+ Assert.assertTrue("StartTime", getStartTime(on) > start1);
+ Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+ assertSkCounterState(on, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
+
+ int eventDrainAttempt2 = random.nextInt(MAX_BOUNDS);
+ int eventDrainSuccess2 = random.nextInt(MAX_BOUNDS);
+
+ skc.addToEventDrainAttemptCount(eventDrainAttempt2);
+ skc.addToEventDrainSuccessCount(eventDrainSuccess2);
+
+ assertSkCounterState(on, 0L, 0L, 0L, 0L, 0L, 0L,
+ eventDrainAttempt2, eventDrainSuccess2);
+ }
+
+ @Test
+ public void testChannelCounter() throws Exception {
+ String name = getRandomName();
+
+ ChannelCounter chc = new ChannelCounter(name);
+ ObjectName on = new ObjectName(CHANNEL_OBJ_NAME_PREFIX + name);
+ assertChCounterState(on, 0L, 0L, 0L, 0L, 0L);
+
+ Assert.assertTrue("StartTime", getStartTime(on) == 0L);
+ Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+ chc.start();
+
+ long start1 = getStartTime(on);
+
+ Assert.assertTrue("StartTime", start1 != 0L);
+ Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+ int numChannelSize = random.nextInt(MAX_BOUNDS);
+ int numEventPutAttempt = random.nextInt(MAX_BOUNDS);
+ int numEventTakeAttempt = random.nextInt(MAX_BOUNDS);
+ int numEventPutSuccess = random.nextInt(MAX_BOUNDS);
+ int numEventTakeSuccess = random.nextInt(MAX_BOUNDS);
+
+ chc.setChannelSize(numChannelSize);
+ for (int i = 0; i<numEventPutAttempt; i++) {
+ chc.incrementEventPutAttemptCount();
+ }
+ for (int i = 0; i<numEventTakeAttempt; i++) {
+ chc.incrementEventTakeAttemptCount();
+ }
+ chc.addToEventPutSuccessCount(numEventPutSuccess);
+ chc.addToEventTakeSuccessCount(numEventTakeSuccess);
+
+ assertChCounterState(on, numChannelSize, numEventPutAttempt,
+ numEventTakeAttempt, numEventPutSuccess, numEventTakeSuccess);
+
+ chc.stop();
+
+ Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+ Assert.assertTrue("StopTime", getStopTime(on) != 0L);
+
+ assertChCounterState(on, numChannelSize, numEventPutAttempt,
+ numEventTakeAttempt, numEventPutSuccess, numEventTakeSuccess);
+
+ chc.start();
+ Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+ Assert.assertTrue("StartTime", getStartTime(on) > start1);
+ Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+ assertChCounterState(on, 0L, 0L, 0L, 0L, 0L);
+ }
+
+ @Test
+ public void testSourceCounter() throws Exception {
+ String name = getRandomName();
+
+ SourceCounter srcc = new SourceCounter(name);
+ ObjectName on = new ObjectName(SOURCE_OBJ_NAME_PREFIX + name);
+
+ assertSrcCounterState(on, 0L, 0L, 0L, 0L, 0L, 0L);
+
+ Assert.assertTrue("StartTime", getStartTime(on) == 0L);
+ Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+ srcc.start();
+
+ long start1 = getStartTime(on);
+
+ Assert.assertTrue("StartTime", start1 != 0L);
+ Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+ int numEventReceived = random.nextInt(MAX_BOUNDS);
+ int numEventAccepted = random.nextInt(MAX_BOUNDS);
+ int numAppendReceived = random.nextInt(MAX_BOUNDS);
+ int numAppendAccepted = random.nextInt(MAX_BOUNDS);
+ int numAppendBatchReceived = random.nextInt(MAX_BOUNDS);
+ int numAppendBatchAccepted = random.nextInt(MAX_BOUNDS);
+
+ srcc.addToEventReceivedCount(numEventReceived);
+ srcc.addToEventAcceptedCount(numEventAccepted);
+ for (int i = 0; i<numAppendReceived; i++) {
+ srcc.incrementAppendReceivedCount();
+ }
+ for (int i = 0; i<numAppendAccepted; i++) {
+ srcc.incrementAppendAcceptedCount();
+ }
+ for (int i = 0; i<numAppendBatchReceived; i++) {
+ srcc.incrementAppendBatchReceivedCount();
+ }
+ for (int i = 0; i<numAppendBatchAccepted; i++) {
+ srcc.incrementAppendBatchAcceptedCount();
+ }
+
+ assertSrcCounterState(on, numEventReceived, numEventAccepted,
+ numAppendReceived, numAppendAccepted, numAppendBatchReceived,
+ numAppendBatchAccepted);
+
+ srcc.stop();
+ Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+ Assert.assertTrue("StopTime", getStopTime(on) != 0L);
+
+ assertSrcCounterState(on, numEventReceived, numEventAccepted,
+ numAppendReceived, numAppendAccepted, numAppendBatchReceived,
+ numAppendBatchAccepted);
+
+ srcc.start();
+ Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+ Assert.assertTrue("StartTime", getStartTime(on) > start1);
+ Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+ assertSrcCounterState(on, 0L, 0L, 0L, 0L, 0L, 0L);
+
+ int numEventReceived2 = random.nextInt(MAX_BOUNDS);
+ int numEventAccepted2 = random.nextInt(MAX_BOUNDS);
+
+ for (int i = 0; i<numEventReceived2; i++) {
+ srcc.incrementEventReceivedCount();
+ }
+
+ for (int i = 0; i<numEventAccepted2; i++) {
+ srcc.incrementEventAcceptedCount();
+ }
+
+ assertSrcCounterState(on, numEventReceived2, numEventAccepted2,
+ 0L, 0L, 0L, 0L);
+ }
+
+ private void assertSrcCounterState(ObjectName on, long eventReceivedCount,
+ long eventAcceptedCount, long appendReceivedCount,
+ long appendAcceptedCount, long appendBatchReceivedCount,
+ long appendBatchAcceptedCount) throws Exception {
+ Assert.assertEquals("SrcEventReceived",
+ getSrcEventReceivedCount(on),
+ eventReceivedCount);
+ Assert.assertEquals("SrcEventAccepted",
+ getSrcEventAcceptedCount(on),
+ eventAcceptedCount);
+ Assert.assertEquals("SrcAppendReceived",
+ getSrcAppendReceivedCount(on),
+ appendReceivedCount);
+ Assert.assertEquals("SrcAppendAccepted",
+ getSrcAppendAcceptedCount(on),
+ appendAcceptedCount);
+ Assert.assertEquals("SrcAppendBatchReceived",
+ getSrcAppendBatchReceivedCount(on),
+ appendBatchReceivedCount);
+ Assert.assertEquals("SrcAppendBatchAccepted",
+ getSrcAppendBatchAcceptedCount(on),
+ appendBatchAcceptedCount);
+ }
+
+ private void assertChCounterState(ObjectName on, long channelSize,
+ long eventPutAttempt, long eventTakeAttempt, long eventPutSuccess,
+ long eventTakeSuccess) throws Exception {
+ Assert.assertEquals("ChChannelSize",
+ getChChannelSize(on),
+ channelSize);
+ Assert.assertEquals("ChEventPutAttempt",
+ getChEventPutAttempt(on),
+ eventPutAttempt);
+ Assert.assertEquals("ChEventTakeAttempt",
+ getChEventTakeAttempt(on),
+ eventTakeAttempt);
+ Assert.assertEquals("ChEventPutSuccess",
+ getChEventPutSuccess(on),
+ eventPutSuccess);
+ Assert.assertEquals("ChEventTakeSuccess",
+ getChEventTakeSuccess(on),
+ eventTakeSuccess);
+ }
+
+ private void assertSkCounterState(ObjectName on, long connCreated,
+ long connClosed, long connFailed, long batchEmpty, long batchUnderflow,
+ long batchComplete, long eventDrainAttempt, long eventDrainSuccess)
+ throws Exception {
+ Assert.assertEquals("SkConnCreated",
+ getSkConnectionCreated(on),
+ connCreated);
+ Assert.assertEquals("SkConnClosed",
+ getSkConnectionClosed(on),
+ connClosed);
+ Assert.assertEquals("SkConnFailed",
+ getSkConnectionFailed(on),
+ connFailed);
+ Assert.assertEquals("SkBatchEmpty",
+ getSkBatchEmpty(on),
+ batchEmpty);
+ Assert.assertEquals("SkBatchUnderflow",
+ getSkBatchUnderflow(on),
+ batchUnderflow);
+ Assert.assertEquals("SkBatchComplete",
+ getSkBatchComplete(on),
+ batchComplete);
+ Assert.assertEquals("SkEventDrainAttempt",
+ getSkEventDrainAttempt(on),
+ eventDrainAttempt);
+ Assert.assertEquals("SkEventDrainSuccess",
+ getSkEventDrainSuccess(on),
+ eventDrainSuccess);
+ }
+
+ private long getStartTime(ObjectName on) throws Exception {
+ return getLongAttribute(on, ATTR_START_TIME);
+ }
+
+ private long getStopTime(ObjectName on) throws Exception {
+ return getLongAttribute(on, ATTR_STOP_TIME);
+ }
+
+ private long getSkConnectionCreated(ObjectName on) throws Exception {
+ return getLongAttribute(on, SK_ATTR_CONN_CREATED);
+ }
+
+ private long getSkConnectionClosed(ObjectName on) throws Exception {
+ return getLongAttribute(on, SK_ATTR_CONN_CLOSED);
+ }
+
+ private long getSkConnectionFailed(ObjectName on) throws Exception {
+ return getLongAttribute(on, SK_ATTR_CONN_FAILED);
+ }
+
+ private long getSkBatchEmpty(ObjectName on) throws Exception {
+ return getLongAttribute(on, SK_ATTR_BATCH_EMPTY);
+ }
+
+ private long getSkBatchUnderflow(ObjectName on) throws Exception {
+ return getLongAttribute(on, SK_ATTR_BATCH_UNDERFLOW);
+ }
+
+ private long getSkBatchComplete(ObjectName on) throws Exception {
+ return getLongAttribute(on, SK_ATTR_BATCH_COMPLETE);
+ }
+
+ private long getSkEventDrainAttempt(ObjectName on) throws Exception {
+ return getLongAttribute(on, SK_ATTR_EVENT_DRAIN_ATTEMPT);
+ }
+
+ private long getSkEventDrainSuccess(ObjectName on) throws Exception {
+ return getLongAttribute(on, SK_ATTR_EVENT_DRAIN_SUCCESS);
+ }
+
+ private long getChChannelSize(ObjectName on) throws Exception {
+ return getLongAttribute(on, CH_ATTR_CHANNEL_SIZE);
+ }
+
+ private long getChEventPutAttempt(ObjectName on) throws Exception {
+ return getLongAttribute(on, CH_ATTR_EVENT_PUT_ATTEMPT);
+ }
+
+ private long getChEventTakeAttempt(ObjectName on) throws Exception {
+ return getLongAttribute(on, CH_ATTR_EVENT_TAKE_ATTEMPT);
+ }
+
+ private long getChEventPutSuccess(ObjectName on) throws Exception {
+ return getLongAttribute(on, CH_ATTR_EVENT_PUT_SUCCESS);
+ }
+
+ private long getChEventTakeSuccess(ObjectName on) throws Exception {
+ return getLongAttribute(on, CH_ATTR_EVENT_TAKE_SUCCESS);
+ }
+
+ private long getSrcAppendBatchAcceptedCount(ObjectName on) throws Exception {
+ return getLongAttribute(on, SRC_ATTR_APPEND_BATCH_ACCEPTED_COUNT);
+ }
+
+ private long getSrcAppendBatchReceivedCount(ObjectName on) throws Exception {
+ return getLongAttribute(on, SRC_ATTR_APPEND_BATCH_RECEVIED_COUNT);
+ }
+
+ private long getSrcAppendAcceptedCount(ObjectName on) throws Exception {
+ return getLongAttribute(on, SRC_ATTR_APPEND_ACCEPTED_COUNT);
+ }
+
+ private long getSrcAppendReceivedCount(ObjectName on) throws Exception {
+ return getLongAttribute(on, SRC_ATTR_APPEND_RECEVIED_COUNT);
+ }
+
+ private long getSrcEventAcceptedCount(ObjectName on) throws Exception {
+ return getLongAttribute(on, SRC_ATTR_EVENT_ACCEPTED_COUNT);
+ }
+
+ private long getSrcEventReceivedCount(ObjectName on) throws Exception {
+ return getLongAttribute(on, SRC_ATTR_EVENT_RECEVIED_COUNT);
+ }
+
+ private long getLongAttribute(ObjectName on, String attr) throws Exception {
+ Object result = getAttribute(on, attr);
+ return ((Long) result).longValue();
+ }
+
+ private Object getAttribute(ObjectName objName, String attrName)
+ throws Exception {
+ return mbServer.getAttribute(objName, attrName);
+ }
+
+ private String getRandomName() {
+ return "random-" + System.nanoTime();
+ }
+
+}
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java Fri Jul 6 22:30:11 2012
@@ -26,8 +26,10 @@ import java.util.concurrent.ScheduledExe
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.flume.Context;
import org.apache.flume.Event;
+import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.FlumeFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -78,11 +80,13 @@ class BucketWriter {
private volatile long batchCounter;
private volatile boolean isOpen;
private volatile ScheduledFuture<Void> timedRollFuture;
+ private SinkCounter sinkCounter;
BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
Context context, String filePath, CompressionCodec codeC,
CompressionType compType, HDFSWriter writer, FlumeFormatter formatter,
- ScheduledExecutorService timedRollerPool, UserGroupInformation user) {
+ ScheduledExecutorService timedRollerPool, UserGroupInformation user,
+ SinkCounter sinkCounter) {
this.rollInterval = rollInterval;
this.rollSize = rollSize;
this.rollCount = rollCount;
@@ -95,6 +99,7 @@ class BucketWriter {
this.formatter = formatter;
this.timedRollerPool = timedRollerPool;
this.user = user;
+ this.sinkCounter = sinkCounter;
fileExtensionCounter = new AtomicLong(System.currentTimeMillis());
@@ -172,24 +177,33 @@ class BucketWriter {
// NOTE: tried synchronizing on the underlying Kerberos principal previously
// which caused deadlocks. See FLUME-1231.
synchronized (staticLock) {
- long counter = fileExtensionCounter.incrementAndGet();
- if (codeC == null) {
- bucketPath = filePath + "." + counter;
- // Need to get reference to FS using above config before underlying
- // writer does in order to avoid shutdown hook & IllegalStateExceptions
- fileSystem = new Path(bucketPath).getFileSystem(config);
- LOG.info("Creating " + bucketPath + IN_USE_EXT);
- writer.open(bucketPath + IN_USE_EXT, formatter);
- } else {
- bucketPath = filePath + "." + counter
- + codeC.getDefaultExtension();
- // need to get reference to FS before writer does to avoid shutdown hook
- fileSystem = new Path(bucketPath).getFileSystem(config);
- LOG.info("Creating " + bucketPath + IN_USE_EXT);
- writer.open(bucketPath + IN_USE_EXT, codeC, compType, formatter);
+ try {
+ long counter = fileExtensionCounter.incrementAndGet();
+ if (codeC == null) {
+ bucketPath = filePath + "." + counter;
+ // Need to get reference to FS using above config before underlying
+ // writer does in order to avoid shutdown hook & IllegalStateExceptions
+ fileSystem = new Path(bucketPath).getFileSystem(config);
+ LOG.info("Creating " + bucketPath + IN_USE_EXT);
+ writer.open(bucketPath + IN_USE_EXT, formatter);
+ } else {
+ bucketPath = filePath + "." + counter
+ + codeC.getDefaultExtension();
+ // need to get reference to FS before writer does to avoid shutdown hook
+ fileSystem = new Path(bucketPath).getFileSystem(config);
+ LOG.info("Creating " + bucketPath + IN_USE_EXT);
+ writer.open(bucketPath + IN_USE_EXT, codeC, compType, formatter);
+ }
+ } catch (Exception ex) {
+ sinkCounter.incrementConnectionFailedCount();
+ if (ex instanceof IOException) {
+ throw (IOException) ex;
+ } else {
+ throw new IOException(ex);
+ }
}
}
-
+ sinkCounter.incrementConnectionCreatedCount();
resetCounters();
// if time-based rolling is enabled, schedule the roll
@@ -234,9 +248,11 @@ class BucketWriter {
if (isOpen) {
try {
writer.close(); // could block
+ sinkCounter.incrementConnectionClosedCount();
} catch (IOException e) {
LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
IN_USE_EXT + "). Exception follows.", e);
+ sinkCounter.incrementConnectionFailedCount();
}
isOpen = false;
} else {
@@ -299,6 +315,7 @@ class BucketWriter {
// write the event
try {
+ sinkCounter.incrementEventDrainAttemptCount();
writer.append(event, formatter); // could block
} catch (IOException e) {
LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Fri Jul 6 22:30:11 2012
@@ -20,7 +20,6 @@ package org.apache.flume.sink.hdfs;
import java.io.File;
import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.LinkedHashMap;
@@ -32,6 +31,7 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -43,6 +43,7 @@ import org.apache.flume.EventDeliveryExc
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
+import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.FlumeFormatter;
import org.apache.hadoop.conf.Configuration;
@@ -58,7 +59,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.ScheduledExecutorService;
public class HDFSEventSink extends AbstractSink implements Configurable {
private static final Logger LOG = LoggerFactory
@@ -124,6 +124,7 @@ public class HDFSEventSink extends Abstr
private long callTimeout;
private Context context;
+ private SinkCounter sinkCounter;
/*
* Extended Java LinkedHashMap for open file handle LRU queue.
@@ -263,6 +264,10 @@ public class HDFSEventSink extends Abstr
"must be > 0 and <= 24");
}
}
+
+ if (sinkCounter == null) {
+ sinkCounter = new SinkCounter(getName());
+ }
}
private static boolean codecMatches(Class<? extends CompressionCodec> cls,
@@ -334,8 +339,10 @@ public class HDFSEventSink extends Abstr
}
} catch (TimeoutException eT) {
future.cancel(true);
+ sinkCounter.incrementConnectionFailedCount();
throw new IOException("Callable timed out", eT);
} catch (ExecutionException e1) {
+ sinkCounter.incrementConnectionFailedCount();
Throwable cause = e1.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
@@ -371,7 +378,8 @@ public class HDFSEventSink extends Abstr
transaction.begin();
try {
Event event = null;
- for (int txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
+ int txnEventCount = 0;
+ for (txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
event = channel.take();
if (event == null) {
break;
@@ -390,7 +398,7 @@ public class HDFSEventSink extends Abstr
bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
batchSize, context, realPath, codeC, compType, hdfsWriter,
- formatter, timedRollerPool, proxyTicket);
+ formatter, timedRollerPool, proxyTicket, sinkCounter);
sfWriters.put(realPath, bucketWriter);
}
@@ -404,6 +412,14 @@ public class HDFSEventSink extends Abstr
append(bucketWriter, event);
}
+ if (txnEventCount == 0) {
+ sinkCounter.incrementBatchEmptyCount();
+ } else if (txnEventCount == txnEventMax) {
+ sinkCounter.incrementBatchCompleteCount();
+ } else {
+ sinkCounter.incrementBatchUnderflowCount();
+ }
+
// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
if (!bucketWriter.isBatchComplete()) {
@@ -412,6 +428,11 @@ public class HDFSEventSink extends Abstr
}
transaction.commit();
+
+ if (txnEventCount > 0) {
+ sinkCounter.addToEventDrainSuccessCount(txnEventCount);
+ }
+
if(event == null) {
return Status.BACKOFF;
}
@@ -470,7 +491,7 @@ public class HDFSEventSink extends Abstr
sfWriters.clear();
sfWriters = null;
-
+ sinkCounter.stop();
super.stop();
}
@@ -485,6 +506,7 @@ public class HDFSEventSink extends Abstr
new ThreadFactoryBuilder().setNameFormat(rollerName).build());
this.sfWriters = new WriterLinkedHashMap(maxOpenFiles);
+ sinkCounter.start();
super.start();
}
@@ -512,7 +534,7 @@ public class HDFSEventSink extends Abstr
//HDFSEventSink will halt when keytab file is non-exist or unreadable
File kfile = new File(kerbKeytab);
if (!(kfile.isFile() && kfile.canRead())) {
- throw new IllegalArgumentException("The keyTab file: "
+ throw new IllegalArgumentException("The keyTab file: "
+ kerbKeytab + " is nonexistent or can't read. "
+ "Please specify a readable keytab file for Kerberos auth.");
}
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java Fri Jul 6 22:30:11 2012
@@ -18,14 +18,15 @@
*/
package org.apache.flume.sink.hdfs;
-import com.google.common.base.Charsets;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
import org.apache.hadoop.io.SequenceFile;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -34,6 +35,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Charsets;
+
public class TestBucketWriter {
private static Logger logger =
@@ -61,7 +64,8 @@ public class TestBucketWriter {
HDFSTextFormatter formatter = new HDFSTextFormatter();
BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
"/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
- formatter, timedRollerPool, null);
+ formatter, timedRollerPool, null,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
@@ -84,7 +88,8 @@ public class TestBucketWriter {
HDFSTextFormatter formatter = new HDFSTextFormatter();
BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx,
"/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
- formatter, timedRollerPool, null);
+ formatter, timedRollerPool, null,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
@@ -109,7 +114,8 @@ public class TestBucketWriter {
HDFSTextFormatter formatter = new HDFSTextFormatter();
BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
"/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
- formatter, timedRollerPool, null);
+ formatter, timedRollerPool, null,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
long startNanos = System.nanoTime();