You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/19 06:30:44 UTC

[incubator-pulsar] branch master updated: fixing/improving logging for function instance (#2586)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f8bd00  fixing/improving logging for function instance (#2586)
3f8bd00 is described below

commit 3f8bd00d961f1f56233c629746c11cbd711bf06e
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Sep 18 23:30:39 2018 -0700

    fixing/improving logging for function instance (#2586)
    
    ### Motivation
    logging for java function instance (process mode and localrun) is not working well.  Log files are created at inappropriate locations for logs not captured by the routing appender, functions bookkeeper logs are missing because we are not using the shaded classpath.
    
    The best for logging is just to put all the logs from the java instance log in a single log and not worry about the routing which is only useful when running in threaded mode
---
 .../pulsar/functions/runtime/JavaInstanceMain.java |  9 ---
 .../pulsar/functions/runtime/ProcessRuntime.java   | 22 +++---
 .../pulsar/functions/runtime/ThreadRuntime.java    |  4 +-
 .../src/main/resources/java_instance_log4j2.yml    | 78 +++-------------------
 .../functions/runtime/ProcessRuntimeTest.java      |  4 +-
 5 files changed, 31 insertions(+), 86 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 11da7c3..083686b 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -22,27 +22,18 @@ package org.apache.pulsar.functions.runtime;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.converters.StringConverter;
-import com.google.gson.Gson;
 import com.google.protobuf.Empty;
 import com.google.protobuf.util.JsonFormat;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 
-import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index a978a09..c5159b0 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -22,32 +22,32 @@ package org.apache.pulsar.functions.runtime;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.Gson;
 import com.google.protobuf.Empty;
 import com.google.protobuf.util.JsonFormat;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 
 import java.io.InputStream;
-import java.util.*;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
 /**
  * A function container implemented using java thread.
  */
@@ -97,8 +97,14 @@ class ProcessRuntime implements Runtime {
             // by the child process and manually added to classpath
             args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
             args.add("-Dlog4j.configurationFile=java_instance_log4j2.yml");
-            args.add("-Dpulsar.log.dir=" + logDirectory);
-            args.add("-Dpulsar.log.file=" + instanceConfig.getFunctionDetails().getName());
+            args.add("-Dpulsar.function.log.dir=" + String.format(
+                    "%s/%s",
+                    logDirectory,
+                    FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())));
+            args.add("-Dpulsar.function.log.file=" + String.format(
+                    "%s-%s",
+                    instanceConfig.getFunctionDetails().getName(),
+                    instanceConfig.getInstanceId()));
             if (instanceConfig.getFunctionDetails().getResources() != null) {
                 Function.Resources resources = instanceConfig.getFunctionDetails().getResources();
                 if (resources.getRam() != 0) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 5cb8ce0..d752ed7 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -72,7 +72,9 @@ class ThreadRuntime implements Runtime {
     public void start() {
         log.info("ThreadContainer starting function with instance config {}", instanceConfig);
         this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
-                FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
+                String.format("%s-%s",
+                        FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
+                        instanceConfig.getInstanceId()));
         this.fnThread.start();
     }
 
diff --git a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
index 25583a6..df25367 100644
--- a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
+++ b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
@@ -23,16 +23,10 @@ Configuration:
 
   Properties:
     Property:
-      - name: "pulsar.log.dir"
-        value: "logs"
-      - name: "pulsar.log.file"
-        value: "pulsar-functions.log"
       - name: "pulsar.log.appender"
-        value: "RoutingAppender"
+        value: "RollingFile"
       - name: "pulsar.log.level"
         value: "info"
-      - name: "pulsar.routing.appender.default"
-        value: "RollingFile"
       - name: "bk.log.level"
         value: "info"
 
@@ -48,8 +42,8 @@ Configuration:
     # Rolling file appender configuration
     RollingFile:
       name: RollingFile
-      fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
-      filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
+      fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log"
+      filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
       immediateFlush: false
       PatternLayout:
         Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
@@ -66,19 +60,19 @@ Configuration:
       # Delete file older than 30days
       DefaultRolloverStrategy:
           Delete:
-            basePath: ${sys:pulsar.log.dir}
+            basePath: ${sys:pulsar.function.log.dir}
             maxDepth: 2
             IfFileName:
-              glob: "*/${sys:pulsar.log.file}*log.gz"
+              glob: "*/${sys:pulsar.function.log.file}*log.gz"
             IfLastModified:
               age: 30d
 
     # Rolling file appender configuration for bk
     RollingRandomAccessFile:
       name: BkRollingFile
-      fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk"
-      filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz"
-      immediateFlush: true
+      fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk"
+      filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz"
+      immediateFlush: false
       PatternLayout:
         Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
       Policies:
@@ -94,67 +88,17 @@ Configuration:
       # Delete file older than 30days
       DefaultRolloverStrategy:
           Delete:
-            basePath: ${sys:pulsar.log.dir}
+            basePath: ${sys:pulsar.function.log.dir}
             maxDepth: 2
             IfFileName:
-              glob: "*/${sys:pulsar.log.file}.bk*log.gz"
+              glob: "*/${sys:pulsar.function.log.file}.bk*log.gz"
             IfLastModified:
               age: 30d
 
-    # Routing
-    Routing:
-      name: RoutingAppender
-      Routes:
-        pattern: "$${ctx:function}"
-        Route:
-          -
-            Routing:
-              name: InstanceRoutingAppender
-              Routes:
-                pattern: "$${ctx:instance}"
-                Route:
-                  -
-                    RollingFile:
-                      name: "Rolling-${ctx:function}"
-                      fileName : "${sys:pulsar.log.dir}/${ctx:function}/${sys:pulsar.log.file}-${ctx:instance}.log"
-                      filePattern : "${sys:pulsar.log.dir}/${ctx:function}/${sys:pulsar.log.file}-${ctx:instance}-%d{MM-dd-yyyy}-%i.log.gz"
-                      PatternLayout:
-                        Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance-%X{instance}] %logger{1} - %msg%n"
-                      Policies:
-                        TimeBasedTriggeringPolicy:
-                          interval: 1
-                          modulate: true
-                        SizeBasedTriggeringPolicy:
-                          size: "20MB"
-                        # Trigger every day at midnight that also scan
-                        # roll-over strategy that deletes older file
-                        CronTriggeringPolicy:
-                          schedule: "0 0 0 * * ?"
-                      # Delete file older than 30days
-                      DefaultRolloverStrategy:
-                          Delete:
-                            basePath: ${sys:pulsar.log.dir}
-                            maxDepth: 2
-                            IfFileName:
-                              glob: "*/${sys:pulsar.log.file}*log.gz"
-                            IfLastModified:
-                              age: 30d
-                  - ref: "${sys:pulsar.routing.appender.default}"
-                    key: "${ctx:function}"
-          - ref: "${sys:pulsar.routing.appender.default}"
-            key: "${ctx:function}"
-
   Loggers:
 
     Logger:
-      name: org.apache.bookkeeper
-      level: "${sys:bk.log.level}"
-      additivity: false
-      AppenderRef:
-        - ref: BkRollingFile
-
-    Logger:
-      name: org.apache.distributedlog
+      name: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper
       level: "${sys:bk.log.level}"
       additivity: false
       AppenderRef:
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 5a6517e..6f1e2f3 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
@@ -119,7 +120,8 @@ public class ProcessRuntimeTest {
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
-                + "-Dpulsar.log.dir=" + logDirectory + "/functions" + " -Dpulsar.log.file=" + config.getFunctionDetails().getName()
+                + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionDetailsUtils.getFullyQualifiedName(config.getFunctionDetails())
+                + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
                 + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
                 + " --jar " + userJarFile + " --instance_id "
                 + config.getInstanceId() + " --function_id " + config.getFunctionId()