You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jm...@apache.org on 2011/08/20 02:48:45 UTC

svn commit: r1159824 - in /incubator/flume/trunk/flume-core/src: main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java

Author: jmhsieh
Date: Sat Aug 20 00:48:44 2011
New Revision: 1159824

URL: http://svn.apache.org/viewvc?rev=1159824&view=rev
Log:
FLUME-746: Correct the behavior and logging messages about states transition of wal chunks on retry

Added:
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java
Modified:
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java?rev=1159824&r1=1159823&r2=1159824&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java Sat Aug 20 00:48:44 2011
@@ -785,13 +785,52 @@ public class NaiveFileWALManager impleme
       return;
     }
     if (data != null) {
-      if (data.s == State.SENDING || data.s == State.LOGGED) {
-        LOG.warn("There was a race that happend with SENT vs SENDING states");
-        return;
+      switch (data.s) {
+      case SENDING: {
+        // This is possible if a connection goes down. If we are currently
+        // sending this group, we should continue trying to send (no need to
+        // restarting it by demoting to LOGGED)
+        LOG.info("Attempt to retry chunk in SENDING state.  Data is being sent so "
+            + "there is no need for state transition.");
+        break;
+      }
+      case LOGGED: {
+        // This is likely the most common case where we retry events spooled to
+        // disk
+        LOG.info("Attempt to retry chunk in LOGGED state.  There is no need "
+            + "for state transition.");
+        break;
+      }
+      case SENT: {
+        // This is possible if the collector goes down or if endpoint (HDFS)
+        // goes down. Here we demote the chunk back to LOGGED state.
+        changeState(tag, State.SENT, State.LOGGED);
+        retryCount.incrementAndGet();
+        break;
+      }
+      case E2EACKED: {
+        // This is possible but very unlikely. If a group is in this state it is
+        // about to be deleted and thus doesn't need a state transition.
+        LOG.debug("Attemp to retry chunk in E2EACKED state. There is no "
+            + "need to retry because data is acked.");
+        break;
+      }
+
+      case ERROR: // should never happen
+        LOG.info("Attempt to retry chunk in ERROR state.  Data in ERROR "
+            + "state stays in ERROR state so no transition.");
+        break;
+
+      case IMPORT: // should never happen
+      case WRITING: // should never happen
+      default: {
+        String msg = "Attempting to retry from a state " + data.s
+            + " which is a state do not ever retry from.";
+        LOG.error(msg);
+        throw new IllegalStateException(msg);
+      }
       }
     }
-    changeState(tag, State.SENT, State.LOGGED);
-    retryCount.incrementAndGet();
   }
 
   @Override

Added: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java?rev=1159824&view=auto
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java (added)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java Sat Aug 20 00:48:44 2011
@@ -0,0 +1,216 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.flume.agent;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.cloudera.flume.agent.durability.NaiveFileWALManager;
+import com.cloudera.flume.agent.durability.WALManager;
+import com.cloudera.flume.core.EventImpl;
+import com.cloudera.flume.core.EventSink;
+import com.cloudera.flume.core.EventSource;
+import com.cloudera.flume.handlers.endtoend.AckListener;
+import com.cloudera.flume.handlers.rolling.Tagger;
+import com.cloudera.util.FileUtil;
+
+public class TestFlumeNodeWALNotifier {
+
+  String tag;
+  Date date;
+  Tagger mockTagger;
+  AckListener mockAl;
+  File tmpdir;
+  NaiveFileWALManager walman;
+  Map<String, WALManager> map;
+  EventSink snk;
+  EventSource src;
+
+  @Before
+  public void setup() throws IOException, InterruptedException {
+    tag = "foofootag";
+    date = new Date();
+    mockTagger = mock(Tagger.class);
+    when(mockTagger.getTag()).thenReturn("unused-" + tag);
+    when(mockTagger.newTag()).thenReturn(tag);
+    when(mockTagger.getDate()).thenReturn(date);
+
+    mockAl = mock(AckListener.class);
+
+    tmpdir = FileUtil.mktempdir();
+    walman = new NaiveFileWALManager(tmpdir);
+    walman.open();
+
+    map = new HashMap<String, WALManager>();
+    map.put("wal", walman);
+
+    snk = walman.newAckWritingSink(mockTagger, mockAl);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    FileUtil.rmr(tmpdir);
+  }
+
+  public void triggerRetry() throws IOException {
+    FlumeNodeWALNotifier notif = new FlumeNodeWALNotifier(map);
+    notif.retry(tag);
+  }
+
+  void toWritingState() throws IOException, InterruptedException {
+    EventImpl evt = new EventImpl("foofoodata".getBytes());
+    snk.open();
+    snk.append(evt);
+
+    assertEquals(1, walman.getWritingTags().size());
+    assertEquals(0, walman.getLoggedTags().size());
+    assertEquals(0, walman.getSendingTags().size());
+    assertEquals(0, walman.getSentTags().size());
+  }
+
+  void toLoggedState() throws IOException, InterruptedException {
+    // transition to logged state.
+    snk.close();
+
+    assertEquals(0, walman.getWritingTags().size());
+    assertEquals(1, walman.getLoggedTags().size());
+    assertEquals(0, walman.getSendingTags().size());
+    assertEquals(0, walman.getSentTags().size());
+  }
+
+  void toSendingState() throws IOException, InterruptedException {
+    // transition to sending state.
+    src = walman.getUnackedSource();
+    src.open();
+    while (src.next() != null) {
+      ;
+    }
+    assertEquals(0, walman.getWritingTags().size());
+    assertEquals(0, walman.getLoggedTags().size());
+    assertEquals(1, walman.getSendingTags().size());
+    assertEquals(0, walman.getSentTags().size());
+  }
+
+  void toSentState() throws IOException, InterruptedException {
+    // transition to sent state.
+    src.close();
+    assertEquals(0, walman.getWritingTags().size());
+    assertEquals(0, walman.getLoggedTags().size());
+    assertEquals(0, walman.getSendingTags().size());
+    assertEquals(1, walman.getSentTags().size());
+  }
+
+  void toAckedState() throws IOException {
+    // transition to acked state.
+    walman.toAcked(tag);
+    assertEquals(0, walman.getWritingTags().size());
+    assertEquals(0, walman.getLoggedTags().size());
+    assertEquals(0, walman.getSendingTags().size());
+    assertEquals(0, walman.getSentTags().size());
+  }
+
+  /**
+   * If we attempt to retry something in writing state (whose tag doesn't have
+   * complete checksum, and thus should never be retried) it should fail with
+   * IllegalStateException.
+   */
+  @Test(expected = IllegalStateException.class)
+  public void testRetryWriting() throws IOException, InterruptedException {
+    toWritingState();
+
+    assertEquals(1, walman.getWritingTags().size());
+    assertEquals(0, walman.getLoggedTags().size());
+    assertEquals(0, walman.getSendingTags().size());
+    assertEquals(0, walman.getSentTags().size());
+
+    triggerRetry();
+
+  }
+
+  @Test
+  public void testRetryLogged() throws IOException, InterruptedException {
+    toWritingState();
+    toLoggedState();
+
+    triggerRetry();
+
+    assertEquals(0, walman.getWritingTags().size());
+    assertEquals(1, walman.getLoggedTags().size());
+    assertEquals(0, walman.getSendingTags().size());
+    assertEquals(0, walman.getSentTags().size());
+  }
+
+  @Test
+  public void testRetrySending() throws IOException, InterruptedException {
+    toWritingState();
+    toLoggedState();
+    toSendingState();
+
+    triggerRetry();
+
+    assertEquals(0, walman.getWritingTags().size());
+    assertEquals(0, walman.getLoggedTags().size());
+    assertEquals(1, walman.getSendingTags().size());
+    assertEquals(0, walman.getSentTags().size());
+  }
+
+  @Test
+  public void testRetrySent() throws IOException, InterruptedException {
+    toWritingState();
+    toLoggedState();
+    toSendingState();
+    toSentState();
+
+    triggerRetry();
+
+    assertEquals(0, walman.getWritingTags().size());
+    assertEquals(1, walman.getLoggedTags().size());
+    assertEquals(0, walman.getSendingTags().size());
+    assertEquals(0, walman.getSentTags().size());
+  }
+
+  @Test
+  public void testRetryAcked() throws IOException, InterruptedException {
+    toWritingState();
+    toLoggedState();
+    toSendingState();
+    toSentState();
+    toAckedState();
+
+    triggerRetry();
+
+    assertEquals(0, walman.getWritingTags().size());
+    assertEquals(0, walman.getLoggedTags().size());
+    assertEquals(0, walman.getSendingTags().size());
+    assertEquals(0, walman.getSentTags().size());
+  }
+
+}