You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/10/08 00:08:16 UTC

[cassandra] branch cassandra-2.2 updated: Backport changes from CASSANDRA-16120 to other branches

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
     new 0eb8cec  Backport changes from CASSANDRA-16120 to other branches
0eb8cec is described below

commit 0eb8cec5c75a7dc9503f59505bed67da65c49503
Author: David Capwell <dc...@apache.org>
AuthorDate: Wed Oct 7 14:46:03 2020 -0700

    Backport changes from CASSANDRA-16120 to other branches
    
    patch by David Capwell; reviewed by Alex Petrov, Jordan West, Yifan Cai for CASSANDRA-16120
---
 test/conf/logback-dtest.xml                        |  27 +----
 .../distributed/impl/AbstractCluster.java          |  22 +++-
 ...nstanceIDDefiner.java => ClusterIDDefiner.java} |  22 ++--
 .../cassandra/distributed/impl/FileLogAction.java  | 133 +++++++++++++++++++++
 .../cassandra/distributed/impl/Instance.java       |  23 +++-
 .../distributed/impl/InstanceIDDefiner.java        |  12 +-
 .../cassandra/distributed/test/JVMDTestTest.java   |  28 +++++
 7 files changed, 229 insertions(+), 38 deletions(-)

diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml
index 4282fee..9b3216d 100644
--- a/test/conf/logback-dtest.xml
+++ b/test/conf/logback-dtest.xml
@@ -18,35 +18,18 @@
 -->
 
 <configuration debug="false" scan="true" scanPeriod="60 seconds">
+  <define name="cluster_id" class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" />
   <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
 
   <!-- Shutdown hook ensures that async appender flushes -->
   <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
 
-  <appender name="INSTANCEFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
-
-    <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file>
-    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
-      <fileNamePattern>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log.%i.gz</fileNamePattern>
-      <minIndex>1</minIndex>
-      <maxIndex>20</maxIndex>
-    </rollingPolicy>
-
-    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
-      <maxFileSize>20MB</maxFileSize>
-    </triggeringPolicy>
-
+  <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
+    <file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file>
     <encoder>
       <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern>
     </encoder>
-    <immediateFlush>false</immediateFlush>
-  </appender>
-
-  <appender name="INSTANCEASYNCFILE" class="ch.qos.logback.classic.AsyncAppender">
-    <discardingThreshold>0</discardingThreshold>
-    <maxFlushTime>0</maxFlushTime>
-    <queueSize>1024</queueSize>
-    <appender-ref ref="INSTANCEFILE"/>
+    <immediateFlush>true</immediateFlush>
   </appender>
 
   <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender">
@@ -79,7 +62,7 @@
   <logger name="org.apache.hadoop" level="WARN"/>
 
   <root level="DEBUG">
-    <appender-ref ref="INSTANCEASYNCFILE" />
+    <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race conditions with appending and searching -->
     <appender-ref ref="INSTANCESTDERR" />
     <appender-ref ref="INSTANCESTDOUT" />
   </root>
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 9793add..f74078e 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -22,12 +22,12 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -45,11 +44,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -59,6 +58,7 @@ import org.apache.cassandra.distributed.api.IListen;
 import org.apache.cassandra.distributed.api.IMessage;
 import org.apache.cassandra.distributed.api.IMessageFilters;
 import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.LogAction;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.AbstractBuilder;
@@ -106,6 +106,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
     private static final Logger logger = LoggerFactory.getLogger(AbstractCluster.class);
     private static final AtomicInteger GENERATION = new AtomicInteger();
 
+    private final UUID clusterId = UUID.randomUUID();
     private final File root;
     private final ClassLoader sharedClassLoader;
     private final int subnet;
@@ -237,6 +238,18 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         }
 
         @Override
+        public boolean getLogsEnabled()
+        {
+            return delegate().getLogsEnabled();
+        }
+
+        @Override
+        public LogAction logs()
+        {
+            return delegate().logs();
+        }
+
+        @Override
         public synchronized void setVersion(Versions.Version version)
         {
             if (!isShutdown)
@@ -307,6 +320,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         NetworkTopology topology = NetworkTopology.build(ipPrefix, broadcastPort, nodeIdTopology);
 
         InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp, datadirCount);
+        config.set("dtest.api.cluster_id", clusterId);
         if (configUpdater != null)
             configUpdater.accept(config);
 
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceIDDefiner.java b/test/distributed/org/apache/cassandra/distributed/impl/ClusterIDDefiner.java
similarity index 72%
copy from test/distributed/org/apache/cassandra/distributed/impl/InstanceIDDefiner.java
copy to test/distributed/org/apache/cassandra/distributed/impl/ClusterIDDefiner.java
index d32bd77..4aad164 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceIDDefiner.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/ClusterIDDefiner.java
@@ -18,24 +18,30 @@
 
 package org.apache.cassandra.distributed.impl;
 
+import java.util.Objects;
+
 import ch.qos.logback.core.PropertyDefinerBase;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
 
 /**
  * Used by logback to find/define property value, see logback-dtest.xml
  */
-public class InstanceIDDefiner extends PropertyDefinerBase
+public class ClusterIDDefiner extends PropertyDefinerBase
 {
-    // Instantiated per classloader, set by Instance
-    private static volatile String instanceId = "<main>";
-    public static void setInstanceId(int id)
+    private static volatile String ID = "<main>";
+
+    public static void setId(String id)
+    {
+        ID = Objects.requireNonNull(id);
+    }
+
+    public static String getId()
     {
-        instanceId = "node" + id;
-        NamedThreadFactory.setGlobalPrefix("node" + id + "_");
+        return ID;
     }
 
+    @Override
     public String getPropertyValue()
     {
-        return instanceId;
+        return ID;
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/FileLogAction.java b/test/distributed/org/apache/cassandra/distributed/impl/FileLogAction.java
new file mode 100644
index 0000000..1d65fe1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/FileLogAction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.UncheckedIOException;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+
+import org.apache.cassandra.distributed.api.LogAction;
+import org.apache.cassandra.distributed.api.LineIterator;
+
+public class FileLogAction implements LogAction
+{
+    private final File file;
+
+    public FileLogAction(File file)
+    {
+        this.file = Objects.requireNonNull(file);
+    }
+
+    @Override
+    public long mark()
+    {
+        return file.length();
+    }
+
+    @Override
+    public LineIterator match(long startPosition, Predicate<String> fn)
+    {
+        RandomAccessFile reader;
+        try
+        {
+            reader = new RandomAccessFile(file, "r");
+        }
+        catch (FileNotFoundException e)
+        {
+            // if file isn't present, don't return an empty stream as it looks the same as no log lines matched
+            throw new UncheckedIOException(e);
+        }
+        if (startPosition > 0) // -1 used to disable, so ignore any negative values or 0 (default offset)
+        {
+            try
+            {
+                reader.seek(startPosition);
+            }
+            catch (IOException e)
+            {
+                throw new UncheckedIOException("Unable to seek to " + startPosition, e);
+            }
+        }
+        return new FileLineIterator(reader, fn);
+    }
+
+    private static final class FileLineIterator extends AbstractIterator<String> implements LineIterator
+    {
+        private final RandomAccessFile reader;
+        private final Predicate<String> fn;
+
+        private FileLineIterator(RandomAccessFile reader, Predicate<String> fn)
+        {
+            this.reader = reader;
+            this.fn = fn;
+        }
+
+        @Override
+        public long mark()
+        {
+            try
+            {
+                return reader.getFilePointer();
+            }
+            catch (IOException e)
+            {
+                throw new UncheckedIOException(e);
+            }
+        }
+
+        @Override
+        protected String computeNext()
+        {
+            try
+            {
+                String s;
+                while ((s = reader.readLine()) != null)
+                {
+                    if (fn.test(s))
+                        return s;
+                }
+                return endOfData();
+            }
+            catch (IOException e)
+            {
+                throw new UncheckedIOException(e);
+            }
+        }
+
+        @Override
+        public void close()
+        {
+            try
+            {
+                Closeables.close(reader, true);
+            }
+            catch (IOException impossible)
+            {
+                throw new AssertionError(impossible);
+            }
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index b8bb60c..db97e79 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -38,7 +39,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
-
 import javax.management.ListenerNotFoundException;
 import javax.management.Notification;
 import javax.management.NotificationListener;
@@ -70,6 +70,7 @@ import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IListen;
 import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.LogAction;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
@@ -128,6 +129,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     {
         super("node" + config.num(), classLoader);
         this.config = config;
+        Object clusterId = Objects.requireNonNull(config.get("dtest.api.cluster_id"), "cluster_id is not defined");
+        ClusterIDDefiner.setId("cluster-" + clusterId);
         InstanceIDDefiner.setInstanceId(config.num());
         FBUtilities.setBroadcastInetAddress(config.broadcastAddress().getAddress());
 
@@ -138,6 +141,24 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     }
 
     @Override
+    public boolean getLogsEnabled()
+    {
+        return true;
+    }
+
+    @Override
+    public LogAction logs()
+    {
+        // the path used is defined by test/conf/logback-dtest.xml and looks like the following
+        // ./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log
+        String tag = System.getProperty("cassandra.testtag", "cassandra.testtag_IS_UNDEFINED");
+        String suite = System.getProperty("suitename", "suitename_IS_UNDEFINED");
+        String clusterId = ClusterIDDefiner.getId();
+        String instanceId = InstanceIDDefiner.getInstanceId();
+        return new FileLogAction(new File(String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, clusterId, instanceId)));
+    }
+
+    @Override
     public IInstanceConfig config()
     {
         return config;
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceIDDefiner.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceIDDefiner.java
index d32bd77..2aa084c 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceIDDefiner.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceIDDefiner.java
@@ -27,15 +27,21 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 public class InstanceIDDefiner extends PropertyDefinerBase
 {
     // Instantiated per classloader, set by Instance
-    private static volatile String instanceId = "<main>";
+    private static volatile String INSTANCE_ID = "<main>";
+
     public static void setInstanceId(int id)
     {
-        instanceId = "node" + id;
+        INSTANCE_ID = "node" + id;
         NamedThreadFactory.setGlobalPrefix("node" + id + "_");
     }
 
+    public static String getInstanceId()
+    {
+        return INSTANCE_ID;
+    }
+
     public String getPropertyValue()
     {
-        return instanceId;
+        return INSTANCE_ID;
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/JVMDTestTest.java b/test/distributed/org/apache/cassandra/distributed/test/JVMDTestTest.java
index 795b4ea..3a1a0a8 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/JVMDTestTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/JVMDTestTest.java
@@ -19,11 +19,20 @@
 package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.LogAction;
+import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
@@ -50,4 +59,23 @@ public class JVMDTestTest extends TestBaseImpl
             assertEquals(1000, (long) res[0][0]);
         }
     }
+
+    @Test
+    public void instanceLogs() throws IOException, TimeoutException
+    {
+        try (Cluster cluster = init(Cluster.build(2).withConfig(c -> c.with(Feature.values())).start()))
+        {
+            // debug logging is turned on so we will see debug logs
+            Assert.assertFalse(cluster.get(1).logs().grep("^DEBUG").getResult().isEmpty());
+            // make sure an exception is thrown in the cluster
+            LogAction logs = cluster.get(2).logs();
+            long mark = logs.mark(); // get the current position so watching doesn't see any previous exceptions
+            cluster.get(2).runOnInstance(() -> {
+                // pretend that an uncaught exception was thrown
+                LoggerFactory.getLogger(CassandraDaemon.class).error("Error", new RuntimeException("fail without fail"));
+            });
+            List<String> errors = logs.watchFor(mark, "^ERROR").getResult();
+            Assert.assertFalse(errors.isEmpty());
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org