You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jm...@apache.org on 2011/08/20 02:48:45 UTC
svn commit: r1159824 - in /incubator/flume/trunk/flume-core/src:
main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java
test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java
Author: jmhsieh
Date: Sat Aug 20 00:48:44 2011
New Revision: 1159824
URL: http://svn.apache.org/viewvc?rev=1159824&view=rev
Log:
FLUME-746: Correct the behavior and logging messages about states transition of wal chunks on retry
Added:
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java
Modified:
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java?rev=1159824&r1=1159823&r2=1159824&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java Sat Aug 20 00:48:44 2011
@@ -785,13 +785,52 @@ public class NaiveFileWALManager impleme
return;
}
if (data != null) {
- if (data.s == State.SENDING || data.s == State.LOGGED) {
- LOG.warn("There was a race that happend with SENT vs SENDING states");
- return;
+ switch (data.s) {
+ case SENDING: {
+ // This is possible if a connection goes down. If we are currently
+ // sending this group, we should continue trying to send (no need to
+ // restarting it by demoting to LOGGED)
+ LOG.info("Attempt to retry chunk in SENDING state. Data is being sent so "
+ + "there is no need for state transition.");
+ break;
+ }
+ case LOGGED: {
+ // This is likely the most common case where we retry events spooled to
+ // disk
+ LOG.info("Attempt to retry chunk in LOGGED state. There is no need "
+ + "for state transition.");
+ break;
+ }
+ case SENT: {
+ // This is possible if the collector goes down or if endpoint (HDFS)
+ // goes down. Here we demote the chunk back to LOGGED state.
+ changeState(tag, State.SENT, State.LOGGED);
+ retryCount.incrementAndGet();
+ break;
+ }
+ case E2EACKED: {
+ // This is possible but very unlikely. If a group is in this state it is
+ // about to be deleted and thus doesn't need a state transition.
+ LOG.debug("Attemp to retry chunk in E2EACKED state. There is no "
+ + "need to retry because data is acked.");
+ break;
+ }
+
+ case ERROR: // should never happen
+ LOG.info("Attempt to retry chunk in ERROR state. Data in ERROR "
+ + "state stays in ERROR state so no transition.");
+ break;
+
+ case IMPORT: // should never happen
+ case WRITING: // should never happen
+ default: {
+ String msg = "Attempting to retry from a state " + data.s
+ + " which is a state do not ever retry from.";
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
}
}
- changeState(tag, State.SENT, State.LOGGED);
- retryCount.incrementAndGet();
}
@Override
Added: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java?rev=1159824&view=auto
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java (added)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestFlumeNodeWALNotifier.java Sat Aug 20 00:48:44 2011
@@ -0,0 +1,216 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.flume.agent;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.cloudera.flume.agent.durability.NaiveFileWALManager;
+import com.cloudera.flume.agent.durability.WALManager;
+import com.cloudera.flume.core.EventImpl;
+import com.cloudera.flume.core.EventSink;
+import com.cloudera.flume.core.EventSource;
+import com.cloudera.flume.handlers.endtoend.AckListener;
+import com.cloudera.flume.handlers.rolling.Tagger;
+import com.cloudera.util.FileUtil;
+
+public class TestFlumeNodeWALNotifier {
+
+ String tag;
+ Date date;
+ Tagger mockTagger;
+ AckListener mockAl;
+ File tmpdir;
+ NaiveFileWALManager walman;
+ Map<String, WALManager> map;
+ EventSink snk;
+ EventSource src;
+
+ @Before
+ public void setup() throws IOException, InterruptedException {
+ tag = "foofootag";
+ date = new Date();
+ mockTagger = mock(Tagger.class);
+ when(mockTagger.getTag()).thenReturn("unused-" + tag);
+ when(mockTagger.newTag()).thenReturn(tag);
+ when(mockTagger.getDate()).thenReturn(date);
+
+ mockAl = mock(AckListener.class);
+
+ tmpdir = FileUtil.mktempdir();
+ walman = new NaiveFileWALManager(tmpdir);
+ walman.open();
+
+ map = new HashMap<String, WALManager>();
+ map.put("wal", walman);
+
+ snk = walman.newAckWritingSink(mockTagger, mockAl);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ FileUtil.rmr(tmpdir);
+ }
+
+ public void triggerRetry() throws IOException {
+ FlumeNodeWALNotifier notif = new FlumeNodeWALNotifier(map);
+ notif.retry(tag);
+ }
+
+ void toWritingState() throws IOException, InterruptedException {
+ EventImpl evt = new EventImpl("foofoodata".getBytes());
+ snk.open();
+ snk.append(evt);
+
+ assertEquals(1, walman.getWritingTags().size());
+ assertEquals(0, walman.getLoggedTags().size());
+ assertEquals(0, walman.getSendingTags().size());
+ assertEquals(0, walman.getSentTags().size());
+ }
+
+ void toLoggedState() throws IOException, InterruptedException {
+ // transition to logged state.
+ snk.close();
+
+ assertEquals(0, walman.getWritingTags().size());
+ assertEquals(1, walman.getLoggedTags().size());
+ assertEquals(0, walman.getSendingTags().size());
+ assertEquals(0, walman.getSentTags().size());
+ }
+
+ void toSendingState() throws IOException, InterruptedException {
+ // transition to sending state.
+ src = walman.getUnackedSource();
+ src.open();
+ while (src.next() != null) {
+ ;
+ }
+ assertEquals(0, walman.getWritingTags().size());
+ assertEquals(0, walman.getLoggedTags().size());
+ assertEquals(1, walman.getSendingTags().size());
+ assertEquals(0, walman.getSentTags().size());
+ }
+
+ void toSentState() throws IOException, InterruptedException {
+ // transition to sent state.
+ src.close();
+ assertEquals(0, walman.getWritingTags().size());
+ assertEquals(0, walman.getLoggedTags().size());
+ assertEquals(0, walman.getSendingTags().size());
+ assertEquals(1, walman.getSentTags().size());
+ }
+
+ void toAckedState() throws IOException {
+ // transition to acked state.
+ walman.toAcked(tag);
+ assertEquals(0, walman.getWritingTags().size());
+ assertEquals(0, walman.getLoggedTags().size());
+ assertEquals(0, walman.getSendingTags().size());
+ assertEquals(0, walman.getSentTags().size());
+ }
+
+ /**
+ * If we attempt to retry something in writing state (whose tag doesn't have
+ * complete checksum, and thus should never be retried) it should fail with
+ * IllegalStateException.
+ */
+ @Test(expected = IllegalStateException.class)
+ public void testRetryWriting() throws IOException, InterruptedException {
+ toWritingState();
+
+ assertEquals(1, walman.getWritingTags().size());
+ assertEquals(0, walman.getLoggedTags().size());
+ assertEquals(0, walman.getSendingTags().size());
+ assertEquals(0, walman.getSentTags().size());
+
+ triggerRetry();
+
+ }
+
+ @Test
+ public void testRetryLogged() throws IOException, InterruptedException {
+ toWritingState();
+ toLoggedState();
+
+ triggerRetry();
+
+ assertEquals(0, walman.getWritingTags().size());
+ assertEquals(1, walman.getLoggedTags().size());
+ assertEquals(0, walman.getSendingTags().size());
+ assertEquals(0, walman.getSentTags().size());
+ }
+
+ @Test
+ public void testRetrySending() throws IOException, InterruptedException {
+ toWritingState();
+ toLoggedState();
+ toSendingState();
+
+ triggerRetry();
+
+ assertEquals(0, walman.getWritingTags().size());
+ assertEquals(0, walman.getLoggedTags().size());
+ assertEquals(1, walman.getSendingTags().size());
+ assertEquals(0, walman.getSentTags().size());
+ }
+
+ @Test
+ public void testRetrySent() throws IOException, InterruptedException {
+ toWritingState();
+ toLoggedState();
+ toSendingState();
+ toSentState();
+
+ triggerRetry();
+
+ assertEquals(0, walman.getWritingTags().size());
+ assertEquals(1, walman.getLoggedTags().size());
+ assertEquals(0, walman.getSendingTags().size());
+ assertEquals(0, walman.getSentTags().size());
+ }
+
+ @Test
+ public void testRetryAcked() throws IOException, InterruptedException {
+ toWritingState();
+ toLoggedState();
+ toSendingState();
+ toSentState();
+ toAckedState();
+
+ triggerRetry();
+
+ assertEquals(0, walman.getWritingTags().size());
+ assertEquals(0, walman.getLoggedTags().size());
+ assertEquals(0, walman.getSendingTags().size());
+ assertEquals(0, walman.getSentTags().size());
+ }
+
+}