You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/09/05 19:30:29 UTC

[GitHub] [pulsar] borlandor edited a comment on issue #5099: "ConnectException: Connection refused: /127.0.0.1:49695" when running python example function

borlandor edited a comment on issue #5099: "ConnectException: Connection refused: /127.0.0.1:49695"  when running python example function
URL: https://github.com/apache/pulsar/issues/5099#issuecomment-528240374
 
 
   My Java function also has this problem:
   ```
   15:12:04.466 [function-timer-thread-90-1] ERROR org.apache.pulsar.functions.runtime.ProcessRuntime - Health check failed for ContextWindowFunction-0
   java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
           at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_222]
           at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_222]
           at org.apache.pulsar.functions.runtime.ProcessRuntime.lambda$start$1(ProcessRuntime.java:164) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_222]
           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_222]
           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_222]
           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_222]
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222]
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222]
           at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
   Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
           at io.grpc.Status.asRuntimeException(Status.java:530) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:482) ~[io.grpc-grpc-stub-1.18.0.jar:1.18.0]
           at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:699) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_222]
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_222]
           ... 1 more
   Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:42455
           at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_222]
           at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_222]
           at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           ... 2 more
   Caused by: java.net.ConnectException: Connection refused
           at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_222]
           at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_222]
           at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           ... 2 more
   15:12:04.524 [function-timer-thread-90-1] ERROR org.apache.pulsar.functions.runtime.ProcessRuntime - Extracted Process death exception
   java.lang.RuntimeException: 
           at org.apache.pulsar.functions.runtime.ProcessRuntime.tryExtractingDeathException(ProcessRuntime.java:380) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
           at org.apache.pulsar.functions.runtime.ProcessRuntime.isAlive(ProcessRuntime.java:367) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
           at org.apache.pulsar.functions.runtime.RuntimeSpawner.lambda$start$0(RuntimeSpawner.java:88) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_222]
           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_222]
           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_222]
           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_222]
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222]
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222]
           at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
   ```
   
   My modified ContextWindowFunction.java :
   ```
   package org.apache.pulsar.functions.api.examples;
   import lombok.extern.slf4j.Slf4j;
   import org.apache.pulsar.functions.api.Record;
   import org.apache.pulsar.functions.api.WindowContext;
   import org.apache.pulsar.functions.api.WindowFunction;
   
   import java.util.Collection;
   import java.util.Set;
   import java.util.Map;
   import java.util.HashMap;
   import java.util.Optional;
   //import java.util.concurrent.CompletableFuture;
   
   import com.alibaba.fastjson.JSON;
   import com.alibaba.fastjson.JSONObject;
   import org.apache.pulsar.functions.api.examples.pojo.MoteStatValue;
   
   /**
    * Example Function that acts on a window of tuples at a time rather than per tuple basis.
    */
   
   @Slf4j
   public class ContextWindowFunction implements WindowFunction<String, String> {
       @Override
       public String process(Collection<Record<String>> LogItems, WindowContext context) {
           //JSONArray jsonArray = new JSONArray();
   
           Map<String, MoteStatValue > mapMoteStat = new HashMap<String, MoteStatValue>();
   
           for (Record<String> record : LogItems) 
           {
   						context.publish("test_debug", record.getValue());
   						JSONObject jsonObject = JSON.parseObject(record.getValue());
   						context.publish("test_debug", "JSONObject.parseObject success!");
   						String s = jsonObject.getString("pktype");
   /*
   						if (s.equals("motetx"))
   						{
   							String eui = jsonObject.getString("eui");
   						  context.publish("test_debug", eui);
   							MoteStatValue statValue = mapMoteStat.get(eui);
   							if (statValue == null)
   							{
   								statValue = new MoteStatValue(0,0);
   							}
   							  
   							statValue.incUpNbs();
   							statValue.incUpThroughput(jsonObject.getIntValue("payloadlen"));
   
   							mapMoteStat.put(eui,statValue);
   							
   						}
    
   */ 
           }
           //Set setMoteStat = mapMoteStat.entrySet();
   			  //context.publish("test_debug", setMoteStat.size() );
           //return setMoteStat.toString();
           return "abacdefg";
       }
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services