You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Xuesong Ding <xd...@appannie.com> on 2014/10/20 09:19:42 UTC

Error occurs when use flume to put data into s3

Dear all,

  We use flume-ng to put data into s3 and hdfs both, but it occurs some
errors when close s3 file. Should we adjust flume parameters or do
something else? Any ideas? Below are the error itself and our flume-ng
configuration, thanks a lot.

  BTW, our flume-ng version is cdh4.7.0

​  Exception stack trace:
  2014-10-14 12:00:30,158 ERROR org.apache.flume.sink.hdfs.BucketWriter:
Unexpected error
com.cloudera.org.apache.http.NoHttpResponseException: The target server
failed to respond
        at
com.cloudera.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:95)
        at
com.cloudera.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:62)
        at
com.cloudera.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:254)
        at
com.cloudera.org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:289)
        at
com.cloudera.org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:252)
        at
com.cloudera.org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)
        at
com.cloudera.org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:300)
        at
com.cloudera.org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:127)
        at
com.cloudera.org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:715)
        at
com.cloudera.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:520)
        at
com.cloudera.org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
        at
com.cloudera.org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
        at
com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)
        at
com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)
        at
com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestPut(RestStorageService.java:1043)
        at
com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.copyObjectImpl(RestStorageService.java:2029)
        at
com.cloudera.org.jets3t.service.StorageService.copyObject(StorageService.java:871)
        at
com.cloudera.org.jets3t.service.StorageService.copyObject(StorageService.java:916)
        at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:314)
        at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
        at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
        at org.apache.hadoop.fs.s3native.$Proxy18.copy(Unknown Source)
        at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.rename(NativeS3FileSystem.java:645)
        at
org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:541)
        at
org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:589)
        at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:161)
        at
org.apache.flume.sink.hdfs.BucketWriter.access$800(BucketWriter.java:57)
        at
org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:586)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

    ​Flume-ng configuration:
  ## THIS FILE CONTAINS FLUME TIER_1 CONFIGURATION

# DIFINE COMPONENTS
a1.sources = r1
a1.sinks =  k1 k2 k3 k4
a1.channels = c1 c2
a1.sinkgroups = g1 g2

# SOURCE(CUSTOM)
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /ssd/disk2
a1.sources.r1.deletePolicy= never
a1.sources.r1.ignorePattern= ^.*\\.tmp$
a1.sources.r1.batchSize= 1
a1.sources.r1.deserializer=
org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
a1.sources.r1.deserializer.maxBlobLength = 300000000

# SINK (HDFS)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.filePrefix = packet
a1.sinks.k1.hdfs.batchSize= 1
a1.sinks.k1.hdfs.fileSuffix = .snappy
a1.sinks.k1.hdfs.codeC = snappy
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 500000000
a1.sinks.k1.hdfs.rollInterval = 300
a1.sinks.k1.hdfs.path = hdfs:/${path}

# SINK (HDFS)
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.filePrefix = packet
a1.sinks.k2.hdfs.batchSize= 1
a1.sinks.k2.hdfs.fileSuffix = .snappy
a1.sinks.k2.hdfs.codeC = snappy
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollSize = 500000000
a1.sinks.k2.hdfs.rollInterval = 300
a1.sinks.k2.hdfs.path = hdfs://${path}

#SINK (S3)
a1.sinks.k3.type = hdfs
a1.sinks.k3.hdfs.filePrefix = packet
a1.sinks.k3.hdfs.batchSize= 1
a1.sinks.k3.hdfs.fileSuffix = .snappy
a1.sinks.k3.hdfs.codeC = snappy
a1.sinks.k3.hdfs.fileType = CompressedStream
a1.sinks.k3.hdfs.rollCount = 0
a1.sinks.k3.hdfs.rollSize = 500000000
a1.sinks.k3.hdfs.rollInterval = 300
a1.sinks.k3.hdfs.path = s3n://${path}

#SINK (S3)
a1.sinks.k4.type = hdfs
a1.sinks.k4.hdfs.filePrefix = packet
a1.sinks.k4.hdfs.batchSize= 1
a1.sinks.k4.hdfs.fileSuffix = .snappy
a1.sinks.k4.hdfs.codeC = snappy
a1.sinks.k4.hdfs.fileType = CompressedStream
a1.sinks.k4.hdfs.rollCount = 0
a1.sinks.k4.hdfs.rollSize = 500000000
a1.sinks.k4.hdfs.rollInterval = 300
a1.sinks.k4.hdfs.path = s3n://${path}

a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

a1.sinkgroups.g2.sinks = k3 k4
a1.sinkgroups.g2.processor.type = load_balance
a1.sinkgroups.g2.processor.backoff = true
a1.sinkgroups.g2.processor.selector = random

# INTERCEPTORS (TIMESTAMP FOR HDFS PATH)
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.preserveExisting = false

# CHANNEL (MEM)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 500
a1.channels.c1.transactionCapacity = 1
#a1.channels.c1.byteCapacity = 3000000000

# CHANNEL (MEM)
a1.channels.c2.type = memory
a1.channels.c2.capacity = 500
a1.channels.c2.transactionCapacity = 1
#a1.channels.c2.byteCapacity = 3000000000

## bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = replicating

a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

a1.sinks.k3.channel = c2
a1.sinks.k4.channel = c2

-- 
*Thanks!*

Re: Error occurs when use flume to put data into s3

Posted by Ahmed Vila <av...@devlogic.eu>.
Guess this is the reason:
a1.sinks.k4.hdfs.path = *s3n*://${path}

It should be s3://

Regards,
Ahmed

On Mon, Oct 20, 2014 at 9:19 AM, Xuesong Ding <xd...@appannie.com> wrote:

> Dear all,
>
>   We use flume-ng to put data into s3 and hdfs both, but it occurs some
> errors when close s3 file. Should we adjust flume parameters or do
> something else? Any ideas? Below are the error itself and our flume-ng
> configuration, thanks a lot.
>
>   BTW, our flume-ng version is cdh4.7.0
>
> ​  Exception stack trace:
>   2014-10-14 12:00:30,158 ERROR org.apache.flume.sink.hdfs.BucketWriter:
> Unexpected error
> com.cloudera.org.apache.http.NoHttpResponseException: The target server
> failed to respond
>         at
> com.cloudera.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:95)
>         at
> com.cloudera.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:62)
>         at
> com.cloudera.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:254)
>         at
> com.cloudera.org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:289)
>         at
> com.cloudera.org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:252)
>         at
> com.cloudera.org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)
>         at
> com.cloudera.org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:300)
>         at
> com.cloudera.org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:127)
>         at
> com.cloudera.org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:715)
>         at
> com.cloudera.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:520)
>         at
> com.cloudera.org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
>         at
> com.cloudera.org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
>         at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)
>         at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)
>         at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestPut(RestStorageService.java:1043)
>         at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.copyObjectImpl(RestStorageService.java:2029)
>         at
> com.cloudera.org.jets3t.service.StorageService.copyObject(StorageService.java:871)
>         at
> com.cloudera.org.jets3t.service.StorageService.copyObject(StorageService.java:916)
>         at
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:314)
>         at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>         at org.apache.hadoop.fs.s3native.$Proxy18.copy(Unknown Source)
>         at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.rename(NativeS3FileSystem.java:645)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:541)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:589)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:161)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.access$800(BucketWriter.java:57)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:586)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
>     ​Flume-ng configuration:
>   ## THIS FILE CONTAINS FLUME TIER_1 CONFIGURATION
>
> # DIFINE COMPONENTS
> a1.sources = r1
> a1.sinks =  k1 k2 k3 k4
> a1.channels = c1 c2
> a1.sinkgroups = g1 g2
>
> # SOURCE(CUSTOM)
> a1.sources.r1.type = spooldir
> a1.sources.r1.spoolDir = /ssd/disk2
> a1.sources.r1.deletePolicy= never
> a1.sources.r1.ignorePattern= ^.*\\.tmp$
> a1.sources.r1.batchSize= 1
> a1.sources.r1.deserializer=
> org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
> a1.sources.r1.deserializer.maxBlobLength = 300000000
>
> # SINK (HDFS)
> a1.sinks.k1.type = hdfs
> a1.sinks.k1.hdfs.filePrefix = packet
> a1.sinks.k1.hdfs.batchSize= 1
> a1.sinks.k1.hdfs.fileSuffix = .snappy
> a1.sinks.k1.hdfs.codeC = snappy
> a1.sinks.k1.hdfs.fileType = CompressedStream
> a1.sinks.k1.hdfs.rollCount = 0
> a1.sinks.k1.hdfs.rollSize = 500000000
> a1.sinks.k1.hdfs.rollInterval = 300
> a1.sinks.k1.hdfs.path = hdfs:/${path}
>
> # SINK (HDFS)
> a1.sinks.k2.type = hdfs
> a1.sinks.k2.hdfs.filePrefix = packet
> a1.sinks.k2.hdfs.batchSize= 1
> a1.sinks.k2.hdfs.fileSuffix = .snappy
> a1.sinks.k2.hdfs.codeC = snappy
> a1.sinks.k2.hdfs.fileType = CompressedStream
> a1.sinks.k2.hdfs.rollCount = 0
> a1.sinks.k2.hdfs.rollSize = 500000000
> a1.sinks.k2.hdfs.rollInterval = 300
> a1.sinks.k2.hdfs.path = hdfs://${path}
>
> #SINK (S3)
> a1.sinks.k3.type = hdfs
> a1.sinks.k3.hdfs.filePrefix = packet
> a1.sinks.k3.hdfs.batchSize= 1
> a1.sinks.k3.hdfs.fileSuffix = .snappy
> a1.sinks.k3.hdfs.codeC = snappy
> a1.sinks.k3.hdfs.fileType = CompressedStream
> a1.sinks.k3.hdfs.rollCount = 0
> a1.sinks.k3.hdfs.rollSize = 500000000
> a1.sinks.k3.hdfs.rollInterval = 300
> a1.sinks.k3.hdfs.path = s3n://${path}
>
> #SINK (S3)
> a1.sinks.k4.type = hdfs
> a1.sinks.k4.hdfs.filePrefix = packet
> a1.sinks.k4.hdfs.batchSize= 1
> a1.sinks.k4.hdfs.fileSuffix = .snappy
> a1.sinks.k4.hdfs.codeC = snappy
> a1.sinks.k4.hdfs.fileType = CompressedStream
> a1.sinks.k4.hdfs.rollCount = 0
> a1.sinks.k4.hdfs.rollSize = 500000000
> a1.sinks.k4.hdfs.rollInterval = 300
> a1.sinks.k4.hdfs.path = s3n://${path}
>
> a1.sinkgroups.g1.sinks = k1 k2
> a1.sinkgroups.g1.processor.type = load_balance
> a1.sinkgroups.g1.processor.backoff = true
> a1.sinkgroups.g1.processor.selector = random
>
> a1.sinkgroups.g2.sinks = k3 k4
> a1.sinkgroups.g2.processor.type = load_balance
> a1.sinkgroups.g2.processor.backoff = true
> a1.sinkgroups.g2.processor.selector = random
>
> # INTERCEPTORS (TIMESTAMP FOR HDFS PATH)
> a1.sources.r1.interceptors = i1 i2
> a1.sources.r1.interceptors.i1.type = timestamp
> a1.sources.r1.interceptors.i2.type = host
> a1.sources.r1.interceptors.i2.preserveExisting = false
>
> # CHANNEL (MEM)
> a1.channels.c1.type = memory
> a1.channels.c1.capacity = 500
> a1.channels.c1.transactionCapacity = 1
> #a1.channels.c1.byteCapacity = 3000000000
>
> # CHANNEL (MEM)
> a1.channels.c2.type = memory
> a1.channels.c2.capacity = 500
> a1.channels.c2.transactionCapacity = 1
> #a1.channels.c2.byteCapacity = 3000000000
>
> ## bind the source and sink to the channel
> a1.sources.r1.channels = c1 c2
> a1.sources.r1.selector.type = replicating
>
> a1.sinks.k1.channel = c1
> a1.sinks.k2.channel = c1
>
> a1.sinks.k3.channel = c2
> a1.sinks.k4.channel = c2
>
> --
> *Thanks!*
>

-- 
---------------------------------------------------------------------
This e-mail and any attachment is for authorised use by the intended 
recipient(s) only. This email contains confidential information. It should 
not be copied, disclosed to, retained or used by, any party other than the 
intended recipient. Any unauthorised distribution, dissemination or copying 
of this E-mail or its attachments, and/or any use of any information 
contained in them, is strictly prohibited and may be illegal. If you are 
not an intended recipient then please promptly delete this e-mail and any 
attachment and all copies and inform the sender directly via email. Any 
emails that you send to us may be monitored by systems or persons other 
than the named communicant for the purposes of ascertaining whether the 
communication complies with the law and company policies.