You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Joe Stein <cr...@gmail.com> on 2012/02/09 10:34:36 UTC

stumped on failing test KAFKA-240

Hi, I am stumped on what is wrong here my test failures due to something in
the code path of the pool is getting called differently than directly to
sync?

any thoughts to what is causing this in my changes I would appreciate think
it needs a fresh set of eyes (maybe just eyes not so tired)

info] == core-kafka / kafka.producer.ProducerTest ==
[info] Test Starting: testSend
arghhhhh
[error] Test Failed: testSend
java.lang.AssertionError:
  Unexpected method call
send(ProducerRequest(0,-1,TODO,0,0:[test-topic:[0,15]])):
    send(ProducerRequest(0,-1,TODO,0,0:[test-topic:[0,15]])): expected: 1,
actual: 0
    close(): expected: 1, actual: 0
at
org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:45)
at
org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:73)
at
org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:92)
at kafka.producer.SyncProducer$$EnhancerByCGLIB$$f15397e4.send(<generated>)
at
kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:126)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
at kafka.producer.Producer.zkSend(Producer.scala:143)
at kafka.producer.Producer.send(Producer.scala:105)
at kafka.producer.ProducerTest.testSend(ProducerTest.scala:138)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.junit.internal.runners.TestMethodRunner.executeMethodBody(TestMethodRunner.java:99)
at
org.junit.internal.runners.TestMethodRunner.runUnprotected(TestMethodRunner.java:81)
at
org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:34)
at
org.junit.internal.runners.TestMethodRunner.runMethod(TestMethodRunner.java:75)
at org.junit.internal.runners.TestMethodRunner.run(TestMethodRunner.java:45)
at
org.junit.internal.runners.TestClassMethodsRunner.invokeTestMethod(TestClassMethodsRunner.java:71)
at
org.junit.internal.runners.TestClassMethodsRunner.run(TestClassMethodsRunner.java:35)
at
org.junit.internal.runners.TestClassRunner$1.runUnprotected(TestClassRunner.java:42)
at
org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:34)
at org.junit.internal.runners.TestClassRunner.run(TestClassRunner.java:52)
at org.junit.internal.runners.CompositeRunner.run(CompositeRunner.java:29)
at org.junit.runner.JUnitCore.run(JUnitCore.java:121)
at org.junit.runner.JUnitCore.run(JUnitCore.java:100)
at org.junit.runner.JUnitCore.run(JUnitCore.java:91)
at org.scalatest.junit.JUnitSuite$class.run(JUnitSuite.scala:261)
at kafka.producer.ProducerTest.run(ProducerTest.scala:39)
at
org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
at sbt.TestRunner.run(TestFramework.scala:53)
at sbt.TestRunner.runTest$1(TestFramework.scala:67)
at sbt.TestRunner.run(TestFramework.scala:76)
at
sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
at
sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
at
sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
at sbt.NamedTestTask.run(TestFramework.scala:92)
at
sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
at
sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
at sbt.impl.RunTask.runTask(RunTask.scala:85)
at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
at sbt.Control$.trapUnit(Control.scala:19)
at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)

I originally thought the send in the sync producer and the pool calling one
method and the test calling another overloaded method was the problem but
that should have been resolved when I wrapped the test params like this

     syncProducer2.send(TestUtils.produceWiredRequest(topic, 0, new
ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new
Message("test1".getBytes))))

instead of how i had it before

     syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec
= NoCompressionCodec, messages = new Message("test1".getBytes)))

but still i am getting the error so some code path is going different
between the pool call the sync producer and the test
https://gist.github.com/1778740 argh!!


/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/

Re: stumped on failing test KAFKA-240

Posted by Joe Stein <cr...@gmail.com>.
yup it is my machine works fine from my linux box, ugh.  second hurdle from
my new latop.

no worries will soldier through it

On Fri, Feb 10, 2012 at 10:34 AM, Joe Stein <cr...@gmail.com> wrote:

> thanks Jun, I just went ahead and checked out the 0.8 branch
>
> before making changes on it I went ahead and ./sbt update && ./sbt test
>
> what is weird is it is hanging on and never getting past
>
> [info] == core-kafka / kafka.javaapi.integration.PrimitiveApiTest ==
> [info] Test Starting:
> testProduceAndFetch(kafka.javaapi.integration.PrimitiveApiTest)
>
> is anyone else seeing this or is it just me?
>
> On Thu, Feb 9, 2012 at 1:13 PM, Jun Rao <ju...@gmail.com> wrote:
>
>> Joe,
>>
>> The producer code has been refactored and simplified in the 0.8 branch
>> recently (e.g., producer pool no longer calls syncprodcuer directly).
>> Could
>> you rebase you code and retry the tests?
>>
>> Thanks,
>>
>> Jun
>>
>> On Thu, Feb 9, 2012 at 1:34 AM, Joe Stein <cr...@gmail.com> wrote:
>>
>> > Hi, I am stumped on what is wrong here my test failures due to
>> something in
>> > the code path of the pool is getting called differently than directly to
>> > sync?
>> >
>> > any thoughts to what is causing this in my changes I would appreciate
>> think
>> > it needs a fresh set of eyes (maybe just eyes not so tired)
>> >
>> > info] == core-kafka / kafka.producer.ProducerTest ==
>> > [info] Test Starting: testSend
>> > arghhhhh
>> > [error] Test Failed: testSend
>> > java.lang.AssertionError:
>> >  Unexpected method call
>> > send(ProducerRequest(0,-1,TODO,0,0:[test-topic:[0,15]])):
>> >    send(ProducerRequest(0,-1,TODO,0,0:[test-topic:[0,15]])): expected:
>> 1,
>> > actual: 0
>> >    close(): expected: 1, actual: 0
>> > at
>> >
>> >
>> org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:45)
>> > at
>> >
>> >
>> org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:73)
>> > at
>> >
>> >
>> org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:92)
>> > at
>> kafka.producer.SyncProducer$$EnhancerByCGLIB$$f15397e4.send(<generated>)
>> > at
>> >
>> >
>> kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:126)
>> > at
>> >
>> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>> > at
>> >
>> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>> > at
>> >
>> >
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>> > at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
>> > at kafka.producer.Producer.zkSend(Producer.scala:143)
>> > at kafka.producer.Producer.send(Producer.scala:105)
>> > at kafka.producer.ProducerTest.testSend(ProducerTest.scala:138)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at
>> >
>> >
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>> > at
>> >
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> > at java.lang.reflect.Method.invoke(Method.java:597)
>> > at
>> >
>> >
>> org.junit.internal.runners.TestMethodRunner.executeMethodBody(TestMethodRunner.java:99)
>> > at
>> >
>> >
>> org.junit.internal.runners.TestMethodRunner.runUnprotected(TestMethodRunner.java:81)
>> > at
>> >
>> >
>> org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:34)
>> > at
>> >
>> >
>> org.junit.internal.runners.TestMethodRunner.runMethod(TestMethodRunner.java:75)
>> > at
>> >
>> org.junit.internal.runners.TestMethodRunner.run(TestMethodRunner.java:45)
>> > at
>> >
>> >
>> org.junit.internal.runners.TestClassMethodsRunner.invokeTestMethod(TestClassMethodsRunner.java:71)
>> > at
>> >
>> >
>> org.junit.internal.runners.TestClassMethodsRunner.run(TestClassMethodsRunner.java:35)
>> > at
>> >
>> >
>> org.junit.internal.runners.TestClassRunner$1.runUnprotected(TestClassRunner.java:42)
>> > at
>> >
>> >
>> org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:34)
>> > at
>> org.junit.internal.runners.TestClassRunner.run(TestClassRunner.java:52)
>> > at
>> org.junit.internal.runners.CompositeRunner.run(CompositeRunner.java:29)
>> > at org.junit.runner.JUnitCore.run(JUnitCore.java:121)
>> > at org.junit.runner.JUnitCore.run(JUnitCore.java:100)
>> > at org.junit.runner.JUnitCore.run(JUnitCore.java:91)
>> > at org.scalatest.junit.JUnitSuite$class.run(JUnitSuite.scala:261)
>> > at kafka.producer.ProducerTest.run(ProducerTest.scala:39)
>> > at
>> >
>> >
>> org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>> > at sbt.TestRunner.run(TestFramework.scala:53)
>> > at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>> > at sbt.TestRunner.run(TestFramework.scala:76)
>> > at
>> >
>> >
>> sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>> > at
>> >
>> >
>> sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>> > at
>> >
>> >
>> sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>> > at sbt.NamedTestTask.run(TestFramework.scala:92)
>> > at
>> >
>> >
>> sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>> > at
>> >
>> >
>> sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>> > at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>> > at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>> > at sbt.impl.RunTask.runTask(RunTask.scala:85)
>> > at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>> > at
>> sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>> > at
>> sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>> > at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>> > at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>> > at sbt.Control$.trapUnit(Control.scala:19)
>> > at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
>> >
>> > I originally thought the send in the sync producer and the pool calling
>> one
>> > method and the test calling another overloaded method was the problem
>> but
>> > that should have been resolved when I wrapped the test params like this
>> >
>> >     syncProducer2.send(TestUtils.produceWiredRequest(topic, 0, new
>> > ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages =
>> new
>> > Message("test1".getBytes))))
>> >
>> > instead of how i had it before
>> >
>> >     syncProducer2.send(topic, 0, new
>> ByteBufferMessageSet(compressionCodec
>> > = NoCompressionCodec, messages = new Message("test1".getBytes)))
>> >
>> > but still i am getting the error so some code path is going different
>> > between the pool call the sync producer and the test
>> > https://gist.github.com/1778740 argh!!
>> >
>> >
>> > /*
>> > Joe Stein
>> > http://www.linkedin.com/in/charmalloc
>> > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> > */
>> >
>>
>
>
>
> --
>
> /*
> Joe Stein
> http://www.linkedin.com/in/charmalloc
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> */
>



-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/

Re: stumped on failing test KAFKA-240

Posted by Joe Stein <cr...@gmail.com>.
thanks Jun, I just went ahead and checked out the 0.8 branch

before making changes on it I went ahead and ./sbt update && ./sbt test

what is weird is it is hanging on and never getting past

[info] == core-kafka / kafka.javaapi.integration.PrimitiveApiTest ==
[info] Test Starting:
testProduceAndFetch(kafka.javaapi.integration.PrimitiveApiTest)

is anyone else seeing this or is it just me?

On Thu, Feb 9, 2012 at 1:13 PM, Jun Rao <ju...@gmail.com> wrote:

> Joe,
>
> The producer code has been refactored and simplified in the 0.8 branch
> recently (e.g., producer pool no longer calls syncprodcuer directly). Could
> you rebase you code and retry the tests?
>
> Thanks,
>
> Jun
>
> On Thu, Feb 9, 2012 at 1:34 AM, Joe Stein <cr...@gmail.com> wrote:
>
> > Hi, I am stumped on what is wrong here my test failures due to something
> in
> > the code path of the pool is getting called differently than directly to
> > sync?
> >
> > any thoughts to what is causing this in my changes I would appreciate
> think
> > it needs a fresh set of eyes (maybe just eyes not so tired)
> >
> > info] == core-kafka / kafka.producer.ProducerTest ==
> > [info] Test Starting: testSend
> > arghhhhh
> > [error] Test Failed: testSend
> > java.lang.AssertionError:
> >  Unexpected method call
> > send(ProducerRequest(0,-1,TODO,0,0:[test-topic:[0,15]])):
> >    send(ProducerRequest(0,-1,TODO,0,0:[test-topic:[0,15]])): expected: 1,
> > actual: 0
> >    close(): expected: 1, actual: 0
> > at
> >
> >
> org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:45)
> > at
> >
> >
> org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:73)
> > at
> >
> >
> org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:92)
> > at
> kafka.producer.SyncProducer$$EnhancerByCGLIB$$f15397e4.send(<generated>)
> > at
> >
> >
> kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:126)
> > at
> > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> > at
> > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
> > at kafka.producer.Producer.zkSend(Producer.scala:143)
> > at kafka.producer.Producer.send(Producer.scala:105)
> > at kafka.producer.ProducerTest.testSend(ProducerTest.scala:138)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> > at java.lang.reflect.Method.invoke(Method.java:597)
> > at
> >
> >
> org.junit.internal.runners.TestMethodRunner.executeMethodBody(TestMethodRunner.java:99)
> > at
> >
> >
> org.junit.internal.runners.TestMethodRunner.runUnprotected(TestMethodRunner.java:81)
> > at
> >
> >
> org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:34)
> > at
> >
> >
> org.junit.internal.runners.TestMethodRunner.runMethod(TestMethodRunner.java:75)
> > at
> > org.junit.internal.runners.TestMethodRunner.run(TestMethodRunner.java:45)
> > at
> >
> >
> org.junit.internal.runners.TestClassMethodsRunner.invokeTestMethod(TestClassMethodsRunner.java:71)
> > at
> >
> >
> org.junit.internal.runners.TestClassMethodsRunner.run(TestClassMethodsRunner.java:35)
> > at
> >
> >
> org.junit.internal.runners.TestClassRunner$1.runUnprotected(TestClassRunner.java:42)
> > at
> >
> >
> org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:34)
> > at
> org.junit.internal.runners.TestClassRunner.run(TestClassRunner.java:52)
> > at
> org.junit.internal.runners.CompositeRunner.run(CompositeRunner.java:29)
> > at org.junit.runner.JUnitCore.run(JUnitCore.java:121)
> > at org.junit.runner.JUnitCore.run(JUnitCore.java:100)
> > at org.junit.runner.JUnitCore.run(JUnitCore.java:91)
> > at org.scalatest.junit.JUnitSuite$class.run(JUnitSuite.scala:261)
> > at kafka.producer.ProducerTest.run(ProducerTest.scala:39)
> > at
> >
> >
> org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
> > at sbt.TestRunner.run(TestFramework.scala:53)
> > at sbt.TestRunner.runTest$1(TestFramework.scala:67)
> > at sbt.TestRunner.run(TestFramework.scala:76)
> > at
> >
> >
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
> > at
> >
> >
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
> > at
> >
> >
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
> > at sbt.NamedTestTask.run(TestFramework.scala:92)
> > at
> >
> >
> sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
> > at
> >
> >
> sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
> > at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
> > at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
> > at sbt.impl.RunTask.runTask(RunTask.scala:85)
> > at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
> > at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
> > at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
> > at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
> > at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
> > at sbt.Control$.trapUnit(Control.scala:19)
> > at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> >
> > I originally thought the send in the sync producer and the pool calling
> one
> > method and the test calling another overloaded method was the problem but
> > that should have been resolved when I wrapped the test params like this
> >
> >     syncProducer2.send(TestUtils.produceWiredRequest(topic, 0, new
> > ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages =
> new
> > Message("test1".getBytes))))
> >
> > instead of how i had it before
> >
> >     syncProducer2.send(topic, 0, new
> ByteBufferMessageSet(compressionCodec
> > = NoCompressionCodec, messages = new Message("test1".getBytes)))
> >
> > but still i am getting the error so some code path is going different
> > between the pool call the sync producer and the test
> > https://gist.github.com/1778740 argh!!
> >
> >
> > /*
> > Joe Stein
> > http://www.linkedin.com/in/charmalloc
> > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > */
> >
>



-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/

Re: stumped on failing test KAFKA-240

Posted by Jun Rao <ju...@gmail.com>.
Joe,

The producer code has been refactored and simplified in the 0.8 branch
recently (e.g., producer pool no longer calls syncprodcuer directly). Could
you rebase you code and retry the tests?

Thanks,

Jun

On Thu, Feb 9, 2012 at 1:34 AM, Joe Stein <cr...@gmail.com> wrote:

> Hi, I am stumped on what is wrong here my test failures due to something in
> the code path of the pool is getting called differently than directly to
> sync?
>
> any thoughts to what is causing this in my changes I would appreciate think
> it needs a fresh set of eyes (maybe just eyes not so tired)
>
> info] == core-kafka / kafka.producer.ProducerTest ==
> [info] Test Starting: testSend
> arghhhhh
> [error] Test Failed: testSend
> java.lang.AssertionError:
>  Unexpected method call
> send(ProducerRequest(0,-1,TODO,0,0:[test-topic:[0,15]])):
>    send(ProducerRequest(0,-1,TODO,0,0:[test-topic:[0,15]])): expected: 1,
> actual: 0
>    close(): expected: 1, actual: 0
> at
>
> org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:45)
> at
>
> org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:73)
> at
>
> org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:92)
> at kafka.producer.SyncProducer$$EnhancerByCGLIB$$f15397e4.send(<generated>)
> at
>
> kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:126)
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
> at kafka.producer.Producer.zkSend(Producer.scala:143)
> at kafka.producer.Producer.send(Producer.scala:105)
> at kafka.producer.ProducerTest.testSend(ProducerTest.scala:138)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
>
> org.junit.internal.runners.TestMethodRunner.executeMethodBody(TestMethodRunner.java:99)
> at
>
> org.junit.internal.runners.TestMethodRunner.runUnprotected(TestMethodRunner.java:81)
> at
>
> org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:34)
> at
>
> org.junit.internal.runners.TestMethodRunner.runMethod(TestMethodRunner.java:75)
> at
> org.junit.internal.runners.TestMethodRunner.run(TestMethodRunner.java:45)
> at
>
> org.junit.internal.runners.TestClassMethodsRunner.invokeTestMethod(TestClassMethodsRunner.java:71)
> at
>
> org.junit.internal.runners.TestClassMethodsRunner.run(TestClassMethodsRunner.java:35)
> at
>
> org.junit.internal.runners.TestClassRunner$1.runUnprotected(TestClassRunner.java:42)
> at
>
> org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:34)
> at org.junit.internal.runners.TestClassRunner.run(TestClassRunner.java:52)
> at org.junit.internal.runners.CompositeRunner.run(CompositeRunner.java:29)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:121)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:100)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:91)
> at org.scalatest.junit.JUnitSuite$class.run(JUnitSuite.scala:261)
> at kafka.producer.ProducerTest.run(ProducerTest.scala:39)
> at
>
> org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
> at sbt.TestRunner.run(TestFramework.scala:53)
> at sbt.TestRunner.runTest$1(TestFramework.scala:67)
> at sbt.TestRunner.run(TestFramework.scala:76)
> at
>
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
> at
>
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
> at
>
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
> at sbt.NamedTestTask.run(TestFramework.scala:92)
> at
>
> sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
> at
>
> sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
> at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
> at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
> at sbt.impl.RunTask.runTask(RunTask.scala:85)
> at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
> at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
> at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
> at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
> at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
> at sbt.Control$.trapUnit(Control.scala:19)
> at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
>
> I originally thought the send in the sync producer and the pool calling one
> method and the test calling another overloaded method was the problem but
> that should have been resolved when I wrapped the test params like this
>
>     syncProducer2.send(TestUtils.produceWiredRequest(topic, 0, new
> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new
> Message("test1".getBytes))))
>
> instead of how i had it before
>
>     syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec
> = NoCompressionCodec, messages = new Message("test1".getBytes)))
>
> but still i am getting the error so some code path is going different
> between the pool call the sync producer and the test
> https://gist.github.com/1778740 argh!!
>
>
> /*
> Joe Stein
> http://www.linkedin.com/in/charmalloc
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> */
>