You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2012/07/18 10:26:25 UTC

svn commit: r1362809 - in /flume/trunk/flume-ng-core: pom.xml src/main/java/org/apache/flume/channel/ChannelProcessor.java src/main/java/org/apache/flume/source/AvroSource.java src/test/java/org/apache/flume/channel/TestChannelProcessor.java

Author: hshreedharan
Date: Wed Jul 18 08:26:25 2012
New Revision: 1362809

URL: http://svn.apache.org/viewvc?rev=1362809&view=rev
Log:
FLUME-1377. ChannelProcessor should not throw NPE if channel.getTransaction throws.

(Mike Percy via Hari Shreedharan)

Added:
    flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java   (with props)
Modified:
    flume/trunk/flume-ng-core/pom.xml
    flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
    flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java

Modified: flume/trunk/flume-ng-core/pom.xml
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/pom.xml?rev=1362809&r1=1362808&r2=1362809&view=diff
==============================================================================
--- flume/trunk/flume-ng-core/pom.xml (original)
+++ flume/trunk/flume-ng-core/pom.xml Wed Jul 18 08:26:25 2012
@@ -164,6 +164,12 @@ limitations under the License.
       <artifactId>servlet-api</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>

Modified: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java?rev=1362809&r1=1362808&r2=1362809&view=diff
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java (original)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java Wed Jul 18 08:26:25 2012
@@ -173,9 +173,9 @@ public class ChannelProcessor implements
 
     // Process required channels
     for (Channel reqChannel : reqChannelQueue.keySet()) {
-      Transaction tx = null;
+      Transaction tx = reqChannel.getTransaction();
+      Preconditions.checkNotNull(tx, "Transaction object must not be null");
       try {
-        tx = reqChannel.getTransaction();
         tx.begin();
 
         List<Event> batch = reqChannelQueue.get(reqChannel);
@@ -204,9 +204,9 @@ public class ChannelProcessor implements
 
     // Process optional channels
     for (Channel optChannel : optChannelQueue.keySet()) {
-      Transaction tx = null;
+      Transaction tx = optChannel.getTransaction();
+      Preconditions.checkNotNull(tx, "Transaction object must not be null");
       try {
-        tx = optChannel.getTransaction();
         tx.begin();
 
         List<Event> batch = optChannelQueue.get(optChannel);

Modified: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1362809&r1=1362808&r2=1362809&view=diff
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Wed Jul 18 08:26:25 2012
@@ -235,9 +235,12 @@ public class AvroSource extends Abstract
 
     try {
       getChannelProcessor().processEventBatch(batch);
-    } catch (ChannelException ex) {
+    } catch (Throwable t) {
       logger.error("Avro source " + getName() + ": Unable to process event " +
-          "batch. Exception follows.", ex);
+          "batch. Exception follows.", t);
+      if (t instanceof Error) {
+        throw (Error) t;
+      }
       return Status.FAILED;
     }
 

Added: flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java?rev=1362809&view=auto
==============================================================================
--- flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java (added)
+++ flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java Wed Jul 18 08:26:25 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.channel;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+public class TestChannelProcessor {
+
+  /**
+   * Ensure that we bubble up any specific exception thrown from getTransaction
+   * instead of another exception masking it such as an NPE
+   */
+  @Test(expected = ChannelException.class)
+  public void testExceptionFromGetTransaction() {
+    // create a channel which unexpectedly throws a ChEx on getTransaction()
+    Channel ch = mock(Channel.class);
+    when(ch.getTransaction()).thenThrow(new ChannelException("doh!"));
+
+    ChannelSelector sel = new ReplicatingChannelSelector();
+    sel.setChannels(Lists.newArrayList(ch));
+    ChannelProcessor proc = new ChannelProcessor(sel);
+
+    List<Event> events = Lists.newArrayList();
+    events.add(EventBuilder.withBody("event 1", Charsets.UTF_8));
+
+    proc.processEventBatch(events);
+  }
+
+  /**
+   * Ensure that we see the original NPE from the PreConditions check instead
+   * of an auto-generated NPE, which could be masking something else.
+   */
+  @Test
+  public void testNullFromGetTransaction() {
+    // channel which returns null from getTransaction()
+    Channel ch = mock(Channel.class);
+    when(ch.getTransaction()).thenReturn(null);
+
+    ChannelSelector sel = new ReplicatingChannelSelector();
+    sel.setChannels(Lists.newArrayList(ch));
+    ChannelProcessor proc = new ChannelProcessor(sel);
+
+    List<Event> events = Lists.newArrayList();
+    events.add(EventBuilder.withBody("event 1", Charsets.UTF_8));
+
+    boolean threw = false;
+    try {
+      proc.processEventBatch(events);
+    } catch (NullPointerException ex) {
+      threw = true;
+      Assert.assertNotNull("NPE must be manually thrown", ex.getMessage());
+    }
+    Assert.assertTrue("Must throw NPE", threw);
+  }
+
+}

Propchange: flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native