You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by "Wan Yi (武汉_技术部_搜索与精准化_万毅)" <wa...@yhd.com> on 2014/11/27 10:05:22 UTC

答复: Why failover sink processor does not work

I have investigated the HDFSEventSink source code,  found if the exception was  IOException , the exception would not throw to the upper layer,
So FailOverSinkProcessor would not mark this sink as dead.

} catch (IOException eIO) {
      transaction.rollback();
      LOG.warn("HDFS IO error", eIO);
      return Status.BACKOFF;
    } catch (Throwable th) {
      transaction.rollback();
      LOG.error("process failed", th);
      if (th instanceof Error) {
        throw (Error) th;
      } else {
        throw new EventDeliveryException(th);
      }
    }



发件人: Wan Yi(武汉_技术部_搜索与精准化_万毅) [mailto:wanyi@yhd.com]
发送时间: 2014年11月27日 16:02
收件人: user@flume.apache.org
主题: re: Why failover sink processor does not work

By the way, I use flume-1.4.0



Wayne Wan
发件人: Wan Yi(武汉_技术部_搜索与精准化_万毅) [mailto:wanyi@yhd.com]
发送时间: 2014年11月27日 15:57
收件人: user@flume.apache.org<ma...@flume.apache.org>
主题: Why failover sink processor does not work

I use hdfs to store our logs, but the failover processor seems does not work when I killed the hdfs cluster that used by the high priority sink(sinks1).

Below is my config

####  define agent
a1.sources = src1
a1.sinks = sinks1 sinks5
a1.channels = ch1
a1.sinkgroups = g1
#### defined the sink group
a1.sinkgroups.g1.sinks = sinks1 sinks5
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.sinks1 = 5
a1.sinkgroups.g1.processor.priority.sinks5 = 1
a1.sinkgroups.g1.processor.maxpenalty = 1000

#### define http source
a1.sources.src1.type = **
a1.sources.src1.port = 8081
a1.sources.src1.contextPath = /
a1.sources.src1.urlPattern = /t
a1.sources.src1.handler = **
a1.sources.src1.channels = ch1

#### define hdfs sink
a1.sinks.sinks1.type = hdfs
a1.sinks.sinks1.channel = ch1
a1.sinks.sinks1.hdfs.path = hdfs://host1:9000/user/hadoop/flume/ds=%y-%m-%d
a1.sinks.sinks1.hdfs.filePrefix = %{host}
a1.sinks.sinks1.hdfs.batchSize = 1000
a1.sinks.sinks1.hdfs.rollCount = 0
a1.sinks.sinks1.hdfs.rollSize = 0
a1.sinks.sinks1.hdfs.rollInterval = 300
a1.sinks.sinks1.hdfs.idleTimeout = 1800000
a1.sinks.sinks1.hdfs.callTimeout = 20000
a1.sinks.sinks1.hdfs.threadsPoolSize = 250
a1.sinks.sinks1.hdfs.writeFormat = Text
a1.sinks.sinks1.hdfs.fileType = DataStream

#### define hdfs sink
a1.sinks.sinks5.type = hdfs
a1.sinks.sinks5.channel = ch1
a1.sinks.sinks5.hdfs.path = hdfs://host2:8020/user/hadoop/flume/ds=%y-%m-%d
a1.sinks.sinks5.hdfs.filePrefix = %{host}
a1.sinks.sinks5.hdfs.batchSize = 1000
a1.sinks.sinks5.hdfs.rollCount = 0
a1.sinks.sinks5.hdfs.rollSize = 0
a1.sinks.sinks5.hdfs.rollInterval = 300
a1.sinks.sinks5.hdfs.idleTimeout = 1800000
a1.sinks.sinks5.hdfs.callTimeout = 20000
a1.sinks.sinks5.hdfs.threadsPoolSize = 250
a1.sinks.sinks5.hdfs.writeFormat = Text
a1.sinks.sinks5.hdfs.fileType = DataStream


#### define memory channel1
a1.channels.ch1.type = memory
a1.channels.ch1.capacity = 10000
a1.channels.ch1.transactionCapacity = 1000





Wayne Wan


Re: 答复: Why failover sink processor does not work

Posted by Arvind Prabhakar <ar...@streamsets.com>.
Hi Wayne,

Thanks for doing the investigation, this seems like a legitimate problem. I
have created an issue to track this (FLUME-2564
<https://issues.apache.org/jira/browse/FLUME-2564>). In case you already
have a patch to address this problem, please post that on the Jira.

Regards,
Arvind Prabhakar

On Thu, Nov 27, 2014 at 1:05 AM, Wan Yi(武汉_技术部_搜索与精准化_万毅) <wa...@yhd.com>
wrote:

>  I have investigated the HDFSEventSink source code,  found if the
> exception was  IOException , the exception would not throw to the upper
> layer,
>
> So FailOverSinkProcessor would not mark this sink as dead.
>
>
>
> } *catch* (IOException eIO) {
>
>       transaction.rollback();
>
>       *LOG*.warn("HDFS IO error", eIO);
>
>       *return* Status.*BACKOFF*;
>
>     } *catch* (Throwable th) {
>
>       transaction.rollback();
>
>       *LOG*.error("process failed", th);
>
>       *if* (th *instanceof* Error) {
>
>         *throw* (Error) th;
>
>       } *else* {
>
>         *throw* *new* EventDeliveryException(th);
>
>       }
>
>     }
>
>
>
>
>
>
>
> *发件人:* Wan Yi(武汉_技术部_搜索与精准化_万毅) [mailto:wanyi@yhd.com]
> *发送时间:* 2014年11月27日 16:02
> *收件人:* user@flume.apache.org
> *主题:* re: Why failover sink processor does not work
>
>
>
> By the way, I use flume-1.4.0
>
>
>
>
>
>
>
> Wayne Wan
>
> *发件人:* Wan Yi(武汉_技术部_搜索与精准化_万毅) [mailto:wanyi@yhd.com <wa...@yhd.com>]
> *发送时间:* 2014年11月27日 15:57
> *收件人:* user@flume.apache.org
> *主题:* Why failover sink processor does not work
>
>
>
> I use hdfs to store our logs, but the failover processor seems does not
> work when I killed the hdfs cluster that used by the high priority
> sink(sinks1).
>
>
>
> *Below is my config*
>
>
>
> ####  define agent
>
> a1.sources = src1
>
> a1.sinks = sinks1 sinks5
>
> a1.channels = ch1
>
> a1.sinkgroups = g1
>
> #### defined the sink group
>
> a1.sinkgroups.g1.sinks = sinks1 sinks5
>
> a1.sinkgroups.g1.processor.type = failover
>
> a1.sinkgroups.g1.processor.priority.sinks1 = 5
>
> a1.sinkgroups.g1.processor.priority.sinks5 = 1
>
> a1.sinkgroups.g1.processor.maxpenalty = 1000
>
>
>
> #### define http source
>
> a1.sources.src1.type = **
>
> a1.sources.src1.port = 8081
>
> a1.sources.src1.contextPath = /
>
> a1.sources.src1.urlPattern = /t
>
> a1.sources.src1.handler = **
>
> a1.sources.src1.channels = ch1
>
>
>
> #### define hdfs sink
>
> a1.sinks.sinks1.type = hdfs
>
> a1.sinks.sinks1.channel = ch1
>
> a1.sinks.sinks1.hdfs.path = hdfs://host1:9000/user/hadoop/flume/ds=%y-%m-%d
>
> a1.sinks.sinks1.hdfs.filePrefix = %{host}
>
> a1.sinks.sinks1.hdfs.batchSize = 1000
>
> a1.sinks.sinks1.hdfs.rollCount = 0
>
> a1.sinks.sinks1.hdfs.rollSize = 0
>
> a1.sinks.sinks1.hdfs.rollInterval = 300
>
> a1.sinks.sinks1.hdfs.idleTimeout = 1800000
>
> a1.sinks.sinks1.hdfs.callTimeout = 20000
>
> a1.sinks.sinks1.hdfs.threadsPoolSize = 250
>
> a1.sinks.sinks1.hdfs.writeFormat = Text
>
> a1.sinks.sinks1.hdfs.fileType = DataStream
>
>
>
> #### define hdfs sink
>
> a1.sinks.sinks5.type = hdfs
>
> a1.sinks.sinks5.channel = ch1
>
> a1.sinks.sinks5.hdfs.path = hdfs://host2:8020/user/hadoop/flume/ds=%y-%m-%d
>
> a1.sinks.sinks5.hdfs.filePrefix = %{host}
>
> a1.sinks.sinks5.hdfs.batchSize = 1000
>
> a1.sinks.sinks5.hdfs.rollCount = 0
>
> a1.sinks.sinks5.hdfs.rollSize = 0
>
> a1.sinks.sinks5.hdfs.rollInterval = 300
>
> a1.sinks.sinks5.hdfs.idleTimeout = 1800000
>
> a1.sinks.sinks5.hdfs.callTimeout = 20000
>
> a1.sinks.sinks5.hdfs.threadsPoolSize = 250
>
> a1.sinks.sinks5.hdfs.writeFormat = Text
>
> a1.sinks.sinks5.hdfs.fileType = DataStream
>
>
>
>
>
> #### define memory channel1
>
> a1.channels.ch1.type = memory
>
> a1.channels.ch1.capacity = 10000
>
> a1.channels.ch1.transactionCapacity = 1000
>
>
>
>
>
>
>
>
>
>
>
> Wayne Wan
>
>
>