You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/29 22:00:29 UTC

kafka git commit: KAFKA-4843: More efficient round-robin scheduler

Repository: kafka
Updated Branches:
  refs/heads/trunk 6feaa8a58 -> 84a14fec2


KAFKA-4843: More efficient round-robin scheduler

- Improves streams efficiency by more than 200K requests/second (small 100 byte requests)
- Gets streams efficiency very close to pure consumer (see results in https://jenkins.confluent.io/job/system-test-kafka-branch-builder/746/console)

- Maintains same fairness across tasks
- Schedules all records in the queue in-between poll() calls, not just one per task.

Author: Eno Thereska <en...@confluent.io>
Author: Eno Thereska <en...@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang

Closes #2643 from enothereska/minor-schedule-round-robin


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84a14fec
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84a14fec
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84a14fec

Branch: refs/heads/trunk
Commit: 84a14fec299749a208251bce1a0eb9c1a8241d08
Parents: 6feaa8a
Author: Eno Thereska <en...@confluent.io>
Authored: Wed Mar 29 15:00:26 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Mar 29 15:00:26 2017 -0700

----------------------------------------------------------------------
 .../streams/processor/internals/StreamTask.java |  17 +-
 .../processor/internals/StreamThread.java       | 225 ++++++++++++-------
 .../processor/internals/StreamTaskTest.java     |  45 ++--
 .../streams/streams_simple_benchmark_test.py    |   4 +-
 tests/kafkatest/services/streams.py             |   2 +-
 5 files changed, 188 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7bd4be4..092d6e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -169,19 +169,26 @@ public class StreamTask extends AbstractTask implements Punctuator {
     }
 
     /**
-     * Process one record
+     * @return The number of records left in the buffer of this task's partition group
+     */
+    public int numBuffered() {
+        return partitionGroup.numBuffered();
+    }
+
+    /**
+     * Process one record.
      *
-     * @return number of records left in the buffer of this task's partition group after the processing is done
+     * @return true if this method processes a record, false if it does not process a record.
      */
     @SuppressWarnings("unchecked")
-    public int process() {
+    public boolean process() {
         // get the next record to process
         StampedRecord record = partitionGroup.nextRecord(recordInfo);
 
         // if there is no record to process, return immediately
         if (record == null) {
             requiresPoll = true;
-            return 0;
+            return false;
         }
 
         requiresPoll = false;
@@ -224,7 +231,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
             processorContext.setCurrentNode(null);
         }
 
-        return partitionGroup.numBuffered();
+        return true;
     }
 
     private void updateProcessorContext(final ProcessorRecordContext recordContext, final ProcessorNode currNode) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 61d7d72..b90bde5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -223,6 +223,7 @@ public class StreamThread extends Thread {
     private final TaskCreator taskCreator = new TaskCreator();
 
     final ConsumerRebalanceListener rebalanceListener;
+    private final static int UNLIMITED_RECORDS = -1;
 
     public synchronized boolean isInitialized() {
         return state == State.RUNNING;
@@ -519,107 +520,168 @@ public class StreamThread extends Thread {
         return Math.max(this.timerStartedMs - previousTimeMs, 0);
     }
 
-    private void runLoop() {
-        int totalNumBuffered = 0;
-        boolean requiresPoll = true;
-        boolean polledRecords = false;
-
-        consumer.subscribe(sourceTopicPattern, rebalanceListener);
-
-        while (stillRunning()) {
-            this.timerStartedMs = time.milliseconds();
-
-            // try to fetch some records if necessary
-            if (requiresPoll) {
-                requiresPoll = false;
-
-                boolean longPoll = totalNumBuffered == 0;
+    /**
+     * Get the next batch of records by polling.
+     * @return Next batch of records or null if no records available.
+     */
+    private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) {
+        ConsumerRecords<byte[], byte[]> records = null;
 
-                ConsumerRecords<byte[], byte[]> records = null;
+        try {
+            records = consumer.poll(pollTimeMs);
+        } catch (NoOffsetForPartitionException ex) {
+            TopicPartition partition = ex.partition();
+            if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
+                log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic()));
+                consumer.seekToBeginning(ex.partitions());
+            } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
+                consumer.seekToEnd(ex.partitions());
+                log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic()));
+            } else {
 
-                try {
-                    records = consumer.poll(longPoll ? this.pollTimeMs : 0);
-                } catch (NoOffsetForPartitionException ex) {
-                    TopicPartition partition = ex.partition();
-                    if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
-                        log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic()));
-                        consumer.seekToBeginning(ex.partitions());
-                    } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
-                        consumer.seekToEnd(ex.partitions());
-                        log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic()));
-                    } else {
+                if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
+                    setState(State.PENDING_SHUTDOWN);
+                    String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
+                        " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
+                        "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)";
+                    throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex);
+                }
 
-                        if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
-                            setState(State.PENDING_SHUTDOWN);
-                            String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
-                                    " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
-                                    "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)";
-                            throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex);
-                        }
+                if (originalReset.equals("earliest")) {
+                    consumer.seekToBeginning(ex.partitions());
+                } else if (originalReset.equals("latest")) {
+                    consumer.seekToEnd(ex.partitions());
+                }
+                log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset));
+            }
 
-                        if (originalReset.equals("earliest")) {
-                            consumer.seekToBeginning(ex.partitions());
-                        } else if (originalReset.equals("latest")) {
-                            consumer.seekToEnd(ex.partitions());
-                        }
-                        log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset));
-                    }
+        }
 
-                }
+        if (rebalanceException != null)
+            throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);
 
-                if (rebalanceException != null)
-                    throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);
+        return records;
+    }
 
-                if (records != null && !records.isEmpty()) {
-                    int numAddedRecords = 0;
+    /**
+     * Take records and add them to each respective task
+     * @param records Records, can be null
+     */
+    private void addRecordsToTasks(ConsumerRecords<byte[], byte[]> records) {
+        if (records != null && !records.isEmpty()) {
+            int numAddedRecords = 0;
 
-                    for (TopicPartition partition : records.partitions()) {
-                        StreamTask task = activeTasksByPartition.get(partition);
-                        numAddedRecords += task.addRecords(partition, records.records(partition));
-                    }
-                    streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
-                    polledRecords = true;
-                } else {
-                    polledRecords = false;
-                }
+            for (TopicPartition partition : records.partitions()) {
+                StreamTask task = activeTasksByPartition.get(partition);
+                numAddedRecords += task.addRecords(partition, records.records(partition));
+            }
+            streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
+        }
+    }
 
-                // only record poll latency is long poll is required
-                if (longPoll) {
-                    streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
+    /**
+     * Schedule the records processing by selecting which record is processed next. Commits may
+     * happen as records are processed.
+     * @tasks The tasks that have records.
+     * @param recordsProcessedBeforeCommit number of records to be processed before commit is called.
+     *                                     if UNLIMITED_RECORDS, then commit is never called
+     * @return Number of records processed since last commit.
+     */
+    private long processAndPunctuate(final Map<TaskId, StreamTask> tasks,
+                                     final long recordsProcessedBeforeCommit) {
+
+        long totalProcessedEachRound;
+        long totalProcessedSinceLastMaybeCommit = 0;
+        // Round-robin scheduling by taking one record from each task repeatedly
+        // until no task has any records left
+        do {
+            totalProcessedEachRound = 0;
+            for (StreamTask task : tasks.values()) {
+                // we processed one record,
+                // and more are buffered waiting for the next round
+                if (task.process()) {
+                    totalProcessedEachRound++;
+                    totalProcessedSinceLastMaybeCommit++;
                 }
             }
+            if (recordsProcessedBeforeCommit != UNLIMITED_RECORDS &&
+                totalProcessedSinceLastMaybeCommit >= recordsProcessedBeforeCommit) {
+                totalProcessedSinceLastMaybeCommit = 0;
+                long processLatency = computeLatency();
+                streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessedSinceLastMaybeCommit,
+                    timerStartedMs);
+                maybeCommit(this.timerStartedMs);
+            }
+        } while (totalProcessedEachRound != 0);
 
-            // try to process one fetch record from each task via the topology, and also trigger punctuate
-            // functions if necessary, which may result in more records going through the topology in this loop
-            if (totalNumBuffered > 0 || polledRecords) {
-                totalNumBuffered = 0;
-
-                if (!activeTasks.isEmpty()) {
-                    for (StreamTask task : activeTasks.values()) {
+        // go over the tasks again to punctuate or commit
+        for (StreamTask task : tasks.values()) {
+            maybePunctuate(task);
+            if (task.commitNeeded())
+                commitOne(task);
+        }
 
-                        totalNumBuffered += task.process();
+        return totalProcessedSinceLastMaybeCommit;
+    }
 
-                        requiresPoll = requiresPoll || task.requiresPoll();
+    /**
+     * Adjust the number of records that should be processed by scheduler. This avoids
+     * scenarios where the processing time is higher than the commit time.
+     * @param prevRecordsProcessedBeforeCommit Previous number of records processed by scheduler.
+     * @param totalProcessed Total number of records processed in this last round.
+     * @param processLatency Total processing latency in ms processed in this last round.
+     * @param commitTime Desired commit time in ms.
+     * @return An adjusted number of records to be processed in the next round.
+     */
+    private long adjustRecordsProcessedBeforeCommit(final long prevRecordsProcessedBeforeCommit, final long totalProcessed,
+                                                    final long processLatency, final long commitTime) {
+        long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
+        // check if process latency larger than commit latency
+        // note that once we set recordsProcessedBeforeCommit, it will never be UNLIMITED_RECORDS again, so
+        // we will never process all records again. This might be an issue if the initial measurement
+        // was off due to a slow start.
+        if (processLatency > commitTime) {
+            // push down
+            recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency);
+            log.debug("{} processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}",
+                logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit);
+        } else if (prevRecordsProcessedBeforeCommit != UNLIMITED_RECORDS && processLatency > 0) {
+            // push up
+            recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency);
+            log.debug("{} processing latency {} > commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}",
+                logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit);
+        }
 
-                        streamsMetrics.processTimeSensor.record(computeLatency(), timerStartedMs);
+        return recordsProcessedBeforeCommit;
+    }
 
-                        maybePunctuate(task);
+    /**
+     * Main event loop for polling, and processing records through topologies.
+     */
+    private void runLoop() {
+        long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
+        consumer.subscribe(sourceTopicPattern, rebalanceListener);
 
-                        if (task.commitNeeded())
-                            commitOne(task);
-                    }
+        while (stillRunning()) {
+            this.timerStartedMs = time.milliseconds();
 
-                } else {
-                    // even when no task is assigned, we must poll to get a task.
-                    requiresPoll = true;
+            // try to fetch some records if necessary
+            ConsumerRecords<byte[], byte[]> records = pollRequests(this.pollTimeMs);
+            if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {
+                streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
+                addRecordsToTasks(records);
+                final long totalProcessed = processAndPunctuate(activeTasks, recordsProcessedBeforeCommit);
+                if (totalProcessed > 0) {
+                    final long processLatency = computeLatency();
+                    streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed,
+                        timerStartedMs);
+                    recordsProcessedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed,
+                        processLatency, commitTimeMs);
                 }
-
-            } else {
-                requiresPoll = true;
             }
+
             maybeCommit(timerStartedMs);
             maybeUpdateStandbyTasks();
-
             maybeClean(timerStartedMs);
         }
         log.info("{} Shutting down at user request", logPrefix);
@@ -692,8 +754,9 @@ public class StreamThread extends Thread {
     protected void maybeCommit(final long now) {
 
         if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
-            log.info("{} Committing all active tasks {} and standby tasks {} because the commit interval {}ms has elapsed",
-                    logPrefix, commitTimeMs, activeTasks, standbyTasks);
+
+            log.info("{} Committing all active tasks {} and standby tasks {} because the commit interval {}ms has elapsed by {}ms",
+                    logPrefix, activeTasks, standbyTasks, commitTimeMs, now - lastCommitMs);
 
             commitAll();
             lastCommitMs = now;

http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index c1dce59..7c9f46b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -172,27 +172,33 @@ public class StreamTaskTest {
                 new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
         ));
 
-        assertEquals(5, task.process());
+        assertTrue(task.process());
+        assertEquals(5, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
-        assertEquals(4, task.process());
+        assertTrue(task.process());
+        assertEquals(4, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
-        assertEquals(3, task.process());
+        assertTrue(task.process());
+        assertEquals(3, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
-        assertEquals(2, task.process());
+        assertTrue(task.process());
+        assertEquals(2, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
-        assertEquals(1, task.process());
+        assertTrue(task.process());
+        assertEquals(1, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(2, source2.numReceived);
 
-        assertEquals(0, task.process());
+        assertTrue(task.process());
+        assertEquals(0, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(3, source2.numReceived);
     }
@@ -234,7 +240,7 @@ public class StreamTaskTest {
                 new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
         ));
 
-        assertEquals(5, task.process());
+        assertTrue(task.process());
         assertEquals(1, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
@@ -251,21 +257,21 @@ public class StreamTaskTest {
         assertTrue(consumer.paused().contains(partition1));
         assertTrue(consumer.paused().contains(partition2));
 
-        assertEquals(7, task.process());
+        assertTrue(task.process());
         assertEquals(2, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
         assertEquals(1, consumer.paused().size());
         assertTrue(consumer.paused().contains(partition2));
 
-        assertEquals(6, task.process());
+        assertTrue(task.process());
         assertEquals(3, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
         assertEquals(1, consumer.paused().size());
         assertTrue(consumer.paused().contains(partition2));
 
-        assertEquals(5, task.process());
+        assertTrue(task.process());
         assertEquals(3, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
@@ -289,40 +295,47 @@ public class StreamTaskTest {
 
         assertTrue(task.maybePunctuate());
 
-        assertEquals(5, task.process());
+        assertTrue(task.process());
+        assertEquals(5, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
         assertFalse(task.maybePunctuate());
 
-        assertEquals(4, task.process());
+        assertTrue(task.process());
+        assertEquals(4, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
         assertTrue(task.maybePunctuate());
 
-        assertEquals(3, task.process());
+        assertTrue(task.process());
+        assertEquals(3, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
         assertFalse(task.maybePunctuate());
 
-        assertEquals(2, task.process());
+        assertTrue(task.process());
+        assertEquals(2, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(2, source2.numReceived);
 
         assertTrue(task.maybePunctuate());
 
-        assertEquals(1, task.process());
+        assertTrue(task.process());
+        assertEquals(1, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(2, source2.numReceived);
 
         assertFalse(task.maybePunctuate());
 
-        assertEquals(0, task.process());
+        assertTrue(task.process());
+        assertEquals(0, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(3, source2.numReceived);
 
+        assertFalse(task.process());
         assertFalse(task.maybePunctuate());
 
         processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);

http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index c9f970e..9f0e457 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -30,12 +30,12 @@ class StreamsSimpleBenchmarkTest(Test):
 
     def __init__(self, test_context):
         super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
-        self.num_records = 10000000L
+        self.num_records = 20000000L
         self.replication = 1
 
 
     @cluster(num_nodes=9)
-    @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 2, 3])
+    @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
     def test_simple_benchmark(self, test, scale):
         """
         Run simple Kafka Streams benchmark

http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 4f8f1a3..e7be947 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -97,7 +97,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
             self.logger.info("Restarting Kafka Streams on " + str(node.account))
             self.start_node(node)
 
-    def wait(self, timeout_sec=720):
+    def wait(self, timeout_sec=1440):
         for node in self.nodes:
             self.wait_node(node, timeout_sec)