You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by cc...@apache.org on 2024/01/17 15:48:14 UTC

(yunikorn-core) branch master updated: [YUNIKORN-2320] Package-level logging breaks custom logging configuration (#773)

This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 4323f643 [YUNIKORN-2320] Package-level logging breaks custom logging configuration (#773)
4323f643 is described below

commit 4323f643fa79e3294e65e2cf7fcca2900322af01
Author: Ovidiu Feodorov <o_...@apple.com>
AuthorDate: Wed Jan 17 09:43:24 2024 -0600

    [YUNIKORN-2320] Package-level logging breaks custom logging configuration (#773)
    
    The rate-limited logger functionality added in YUNIKORN-1985 prevents
    custom logging configuration from working, as the package-level init
    happens before the custom configuration can take effect. Refactor
    the init into a sync.Once that fires on the first logged message instead.
    
    Closes: #773
    
    Signed-off-by: Craig Condit <cc...@apache.org>
---
 pkg/entrypoint/entrypoint_test.go         | 100 ++++++++++++++++++++++++++++++
 pkg/log/rate_limited_logger.go            |  22 +++----
 pkg/log/rate_limited_logger_test.go       |  18 +++---
 pkg/scheduler/objects/application.go      |  17 +++--
 pkg/scheduler/objects/application_test.go |   5 ++
 5 files changed, 138 insertions(+), 24 deletions(-)

diff --git a/pkg/entrypoint/entrypoint_test.go b/pkg/entrypoint/entrypoint_test.go
new file mode 100644
index 00000000..9d8196f8
--- /dev/null
+++ b/pkg/entrypoint/entrypoint_test.go
@@ -0,0 +1,100 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package entrypoint
+
+import (
+	"fmt"
+	"os"
+	"path/filepath"
+	"strings"
+	"testing"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-core/pkg/scheduler/objects"
+
+	"go.uber.org/zap"
+	"gotest.tools/v3/assert"
+)
+
+const relTestDataDir = "build" // test data directory, relative to the project root
+const logFileName = "test.log"
+const logMessage = "log message sent via the core logger"
+
+// TestCustomLoggingConfiguration ensures that custom logging configuration takes even in the presence of "objects"
+// package initialization. "objects" package initialization used to break custom logging configuration in the past,
+// by triggering rateLimitedLogger initialization in the package lexical scope, which did trigger in turn one-time
+// logging system initialization, preventing subsequent custom configuration. See YUNIKORN-2320.
+func TestCustomLoggingConfiguration(t *testing.T) {
+	defer cleanup()
+	// ensure that the "object" package initialization happens
+	app := objects.Application{}
+	assert.Equal(t, "", app.ApplicationID)
+	testDataDir, err := getWritableTestDataDir()
+	assert.NilError(t, err, "failed to get the test data directory")
+	logFile := filepath.Join(testDataDir, logFileName)
+	config := zap.NewDevelopmentConfig()
+	config.OutputPaths = []string{logFile}
+	config.ErrorOutputPaths = []string{logFile}
+	logger, err := config.Build()
+	assert.NilError(t, err, "zap Logger creation failed")
+	log.InitializeLogger(logger, &config)
+	StartAllServices()
+	ykManagedLogger := log.Log(log.Core)
+	ykManagedLogger.Info(logMessage)
+	err = ykManagedLogger.Sync()
+	if err != nil {
+		// if it fails to sync, it may be because the logger is still using /dev/stderr
+		fmt.Printf("%v\n", err)
+	}
+	// make sure the test log messages are in the log file
+	bs, err := os.ReadFile(logFile)
+	assert.NilError(t, err, "failed to read the log file", logFile)
+	assert.Equal(t, strings.Contains(string(bs), logMessage), true, "'%s' not found in the log file %s", logMessage, logFile)
+}
+
+// getWritableTestDataDir returns the absolute path of the validated (in that it ensures it exists in the file system)
+// directory where we can write test data on the local file system.
+func getWritableTestDataDir() (string, error) {
+	dir, err := os.Getwd()
+	if err != nil {
+		return "", err
+	}
+	buildDir := filepath.Join(dir, "../../", relTestDataDir)
+	_, err = os.Stat(buildDir)
+	if err != nil {
+		return "", err
+	}
+	return buildDir, nil
+}
+
+// cleanup removes the test log file, if created in the writable test data directory. Noop if the file is not present.
+func cleanup() {
+	testDataDir, err := getWritableTestDataDir()
+	if err != nil {
+		fmt.Printf("%v\n", err)
+	}
+	err = os.Remove(filepath.Join(testDataDir, logFileName))
+	if err != nil {
+		if os.IsNotExist(err) {
+			// ignore
+			return
+		}
+		fmt.Printf("%v\n", err)
+	}
+}
diff --git a/pkg/log/rate_limited_logger.go b/pkg/log/rate_limited_logger.go
index 9618a7fd..0af0d706 100644
--- a/pkg/log/rate_limited_logger.go
+++ b/pkg/log/rate_limited_logger.go
@@ -25,56 +25,56 @@ import (
 	"golang.org/x/time/rate"
 )
 
-type rateLimitedLogger struct {
+type RateLimitedLogger struct {
 	logger  *zap.Logger
 	limiter *rate.Limiter
 }
 
-// RateLimitedLogger provides a logger that only logs once within a specified duration
-func RateLimitedLog(handle *LoggerHandle, every time.Duration) *rateLimitedLogger {
-	return &rateLimitedLogger{
+// NewRateLimitedLogger provides a logger that only logs once within a specified duration.
+func NewRateLimitedLogger(handle *LoggerHandle, every time.Duration) *RateLimitedLogger {
+	return &RateLimitedLogger{
 		logger:  Log(handle),
 		limiter: rate.NewLimiter(rate.Every(every), 1),
 	}
 }
 
-func (rl *rateLimitedLogger) Debug(msg string, fields ...zap.Field) {
+func (rl *RateLimitedLogger) Debug(msg string, fields ...zap.Field) {
 	if rl.limiter.Allow() {
 		rl.logger.Debug(msg, fields...)
 	}
 }
 
-func (rl *rateLimitedLogger) Info(msg string, fields ...zap.Field) {
+func (rl *RateLimitedLogger) Info(msg string, fields ...zap.Field) {
 	if rl.limiter.Allow() {
 		rl.logger.Info(msg, fields...)
 	}
 }
 
-func (rl *rateLimitedLogger) Warn(msg string, fields ...zap.Field) {
+func (rl *RateLimitedLogger) Warn(msg string, fields ...zap.Field) {
 	if rl.limiter.Allow() {
 		rl.logger.Warn(msg, fields...)
 	}
 }
 
-func (rl *rateLimitedLogger) Error(msg string, fields ...zap.Field) {
+func (rl *RateLimitedLogger) Error(msg string, fields ...zap.Field) {
 	if rl.limiter.Allow() {
 		rl.logger.Error(msg, fields...)
 	}
 }
 
-func (rl *rateLimitedLogger) DPanic(msg string, fields ...zap.Field) {
+func (rl *RateLimitedLogger) DPanic(msg string, fields ...zap.Field) {
 	if rl.limiter.Allow() {
 		rl.logger.DPanic(msg, fields...)
 	}
 }
 
-func (rl *rateLimitedLogger) Panic(msg string, fields ...zap.Field) {
+func (rl *RateLimitedLogger) Panic(msg string, fields ...zap.Field) {
 	if rl.limiter.Allow() {
 		rl.logger.Panic(msg, fields...)
 	}
 }
 
-func (rl *rateLimitedLogger) Fatal(msg string, fields ...zap.Field) {
+func (rl *RateLimitedLogger) Fatal(msg string, fields ...zap.Field) {
 	if rl.limiter.Allow() {
 		rl.logger.Fatal(msg, fields...)
 	}
diff --git a/pkg/log/rate_limited_logger_test.go b/pkg/log/rate_limited_logger_test.go
index c732f03c..98513a10 100644
--- a/pkg/log/rate_limited_logger_test.go
+++ b/pkg/log/rate_limited_logger_test.go
@@ -22,12 +22,12 @@ import (
 	"bufio"
 	"bytes"
 	"encoding/json"
+	"sync"
 	"testing"
 	"time"
 
 	"go.uber.org/zap"
 	"go.uber.org/zap/zapcore"
-	"golang.org/x/time/rate"
 	"gotest.tools/v3/assert"
 )
 
@@ -37,6 +37,9 @@ type logMessage struct {
 }
 
 func TestRateLimitedLog(t *testing.T) {
+	defer resetTestLogger()
+	once = sync.Once{}
+	config := zap.NewDevelopmentConfig()
 	encoderConfig := zap.NewDevelopmentEncoderConfig()
 	buf := bytes.Buffer{}
 	writer := bufio.NewWriter(&buf)
@@ -47,12 +50,9 @@ func TestRateLimitedLog(t *testing.T) {
 			zap.NewAtomicLevelAt(zap.InfoLevel),
 		),
 	)
+	InitializeLogger(zapLogger, &config)
 	// log once within one minute
-	logger := &rateLimitedLogger{
-		logger:  zapLogger,
-		limiter: rate.NewLimiter(rate.Every(time.Minute), 1),
-	}
-
+	logger := NewRateLimitedLogger(Core, 1*time.Minute)
 	startTime := time.Now()
 	for {
 		elapsed := time.Since(startTime)
@@ -62,10 +62,10 @@ func TestRateLimitedLog(t *testing.T) {
 		logger.Info("YuniKorn")
 		time.Sleep(10 * time.Millisecond)
 	}
-	writer.Flush()
-
+	err := writer.Flush()
+	assert.NilError(t, err, "failed to flush writer")
 	var lm logMessage
-	err := json.Unmarshal(buf.Bytes(), &lm)
+	err = json.Unmarshal(buf.Bytes(), &lm)
 	assert.NilError(t, err, "failed to unmarshal logMessage from buffer: %s", buf.Bytes())
 	assert.Equal(t, "INFO", lm.Level)
 	assert.Equal(t, "YuniKorn", lm.Message)
diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go
index 80c27d1e..bc7793d9 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -51,7 +51,8 @@ var (
 	defaultPlaceholderTimeout = 15 * time.Minute
 )
 
-var rateLimitedLog = log.RateLimitedLog(log.SchedApplication, time.Second)
+var initAppLogOnce sync.Once
+var rateLimitedAppLog *log.RateLimitedLogger
 
 const (
 	Soft string = "Soft"
@@ -973,7 +974,7 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption
 			// the iterator might not have the node we need as it could be reserved, or we have not added it yet
 			node := getNodeFn(requiredNode)
 			if node == nil {
-				rateLimitedLog.Warn("required node is not found (could be transient)",
+				getRateLimitedAppLog().Info("required node is not found (could be transient)",
 					zap.String("application ID", sa.ApplicationID),
 					zap.String("allocationKey", request.GetAllocationKey()),
 					zap.String("required node", requiredNode))
@@ -2047,12 +2048,12 @@ func (sa *Application) HasPlaceholderAllocation() bool {
 	return sa.hasPlaceholderAlloc
 }
 
-// test only
+// SetCompletingTimeout should be used for testing only.
 func SetCompletingTimeout(duration time.Duration) {
 	completingTimeout = duration
 }
 
-// test only
+// SetTimedOutPlaceholder should be used for testing only.
 func (sa *Application) SetTimedOutPlaceholder(taskGroupName string, timedOut int64) {
 	sa.Lock()
 	defer sa.Unlock()
@@ -2063,3 +2064,11 @@ func (sa *Application) SetTimedOutPlaceholder(taskGroupName string, timedOut int
 		sa.placeholderData[taskGroupName].TimedOut = timedOut
 	}
 }
+
+// getRateLimitedAppLog lazy initializes the application logger the first time is needed.
+func getRateLimitedAppLog() *log.RateLimitedLogger {
+	initAppLogOnce.Do(func() {
+		rateLimitedAppLog = log.NewRateLimitedLogger(log.SchedApplication, time.Second)
+	})
+	return rateLimitedAppLog
+}
diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go
index 37c0f9a0..3ad703c9 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2357,6 +2357,11 @@ func TestGetOutstandingRequests(t *testing.T) {
 	assert.Equal(t, 0, len(total4), "expected no outstanding requests for TestCase 4")
 }
 
+func TestGetRateLimitedAppLog(t *testing.T) {
+	l := getRateLimitedAppLog()
+	assert.Check(t, l != nil)
+}
+
 func (sa *Application) addPlaceholderDataWithLocking(ask *AllocationAsk) {
 	sa.Lock()
 	defer sa.Unlock()


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org