You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/04/15 19:41:57 UTC

[kafka] branch 2.5 updated: Kafka 9739: Fix for 2.5 branch (#8492)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new f80912a  Kafka 9739: Fix for 2.5 branch (#8492)
f80912a is described below

commit f80912aed9ad23310b35ab948df3826ddb622a25
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Apr 15 15:41:28 2020 -0400

    Kafka 9739: Fix for 2.5 branch (#8492)
    
    This is a port of #8400 for the 2.5 branch
    
    For some context, when building a streams application, the optimizer keeps track of the key-changing operations and any repartition nodes that are descendants of the key-changer. During the optimization phase (if enabled), any repartition nodes are logically collapsed into one. The optimizer updates the graph by inserting the single repartition node between the key-changing node and its first child node. This graph update process is done by searching for a node that has the key-changi [...]
    
    The one exception to this rule is if there is a merge node that is a descendant of the key-changing node, then during the optimization phase, the map tracking key-changers to repartition nodes is updated to have the merge node as the key. Then the optimization process updates the graph to place the single repartition node between the merge node and its first child node.
    
    The error in KAFKA-9739 occurred because there was an assumption that the repartition nodes are children of the merge node. But in the topology from KAFKA-9739, the repartition node was a parent of the merge node. So when attempting to find the first child of the merge node, nothing was found (obviously) resulting in StreamException(Found a null keyChangingChild node for..)
    
    This PR fixes this bug by first checking that all repartition nodes for optimization are children of the merge node.
    
    Reviewers: John Roesler <jo...@confluent.io>
---
 .../kstream/internals/InternalStreamsBuilder.java  |  40 +++--
 .../kstream/internals/graph/StreamsGraphTest.java  | 186 +++++++++++++++++++++
 2 files changed, 209 insertions(+), 17 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 9509431..636fb70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -64,7 +64,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     private final AtomicInteger index = new AtomicInteger(0);
 
     private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
-    private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
+    private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
     private final LinkedHashSet<StreamsGraphNode> mergeNodes = new LinkedHashSet<>();
     private final LinkedHashSet<StreamsGraphNode> tableSourceNodes = new LinkedHashSet<>();
 
@@ -328,10 +328,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     @SuppressWarnings("unchecked")
     private void maybeOptimizeRepartitionOperations() {
         maybeUpdateKeyChangingRepartitionNodeMap();
-        final Iterator<Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>> entryIterator =  keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
+        final Iterator<Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>>> entryIterator =  keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
 
         while (entryIterator.hasNext()) {
-            final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry = entryIterator.next();
+            final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>> entry = entryIterator.next();
 
             final StreamsGraphNode keyChangingNode = entry.getKey();
 
@@ -396,11 +396,13 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         final Set<StreamsGraphNode> mergeNodeKeyChangingParentsToRemove = new HashSet<>();
         for (final StreamsGraphNode mergeNode : mergeNodes) {
             mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
-            final Collection<StreamsGraphNode> keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet();
-            for (final StreamsGraphNode key : keys) {
-                final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
-                if (maybeParentKey != null) {
-                    mergeNodesToKeyChangers.get(mergeNode).add(key);
+            final Set<Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>>> entrySet = keyChangingOperationsToOptimizableRepartitionNodes.entrySet();
+            for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>> entry : entrySet) {
+                if (mergeNodeHasRepartitionChildren(mergeNode, entry.getValue())) {
+                    final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(entry.getKey()));
+                    if (maybeParentKey != null) {
+                        mergeNodesToKeyChangers.get(mergeNode).add(entry.getKey());
+                    }
                 }
             }
         }
@@ -408,7 +410,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : mergeNodesToKeyChangers.entrySet()) {
             final StreamsGraphNode mergeKey = entry.getKey();
             final Collection<StreamsGraphNode> keyChangingParents = entry.getValue();
-            final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes = new LinkedHashSet<>();
+            final LinkedHashSet<OptimizableRepartitionNode<?, ?>> repartitionNodes = new LinkedHashSet<>();
             for (final StreamsGraphNode keyChangingParent : keyChangingParents) {
                 repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
                 mergeNodeKeyChangingParentsToRemove.add(keyChangingParent);
@@ -421,12 +423,16 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private OptimizableRepartitionNode createRepartitionNode(final String repartitionTopicName,
-                                                             final Serde keySerde,
-                                                             final Serde valueSerde) {
+    private boolean mergeNodeHasRepartitionChildren(final StreamsGraphNode mergeNode,
+                                                    final LinkedHashSet<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
+        return repartitionNodes.stream().allMatch(n -> findParentNodeMatching(n, gn -> gn.parentNodes().contains(mergeNode)) != null);
+    }
+
+    private <K, V> OptimizableRepartitionNode<K, V> createRepartitionNode(final String repartitionTopicName,
+                                                                          final Serde<K> keySerde,
+                                                                          final Serde<V> valueSerde) {
 
-        final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+        final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
         KStreamImpl.createRepartitionedSource(this,
                                               keySerde,
                                               valueSerde,
@@ -452,16 +458,16 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         return null;
     }
 
-    private String getFirstRepartitionTopicName(final Collection<OptimizableRepartitionNode> repartitionNodes) {
+    private String getFirstRepartitionTopicName(final Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
         return repartitionNodes.iterator().next().repartitionTopic();
     }
 
     @SuppressWarnings("unchecked")
-    private GroupedInternal getRepartitionSerdes(final Collection<OptimizableRepartitionNode> repartitionNodes) {
+    private GroupedInternal getRepartitionSerdes(final Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
         Serde keySerde = null;
         Serde valueSerde = null;
 
-        for (final OptimizableRepartitionNode repartitionNode : repartitionNodes) {
+        for (final OptimizableRepartitionNode<?, ?> repartitionNode : repartitionNodes) {
             if (keySerde == null && repartitionNode.keySerde() != null) {
                 keySerde = repartitionNode.keySerde();
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index e2006e6..c1032f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -22,14 +22,22 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.junit.Test;
 
 import java.time.Duration;
@@ -47,6 +55,8 @@ import static org.junit.Assert.assertEquals;
 public class StreamsGraphTest {
 
     private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
+    private Initializer<String> initializer;
+    private Aggregator<String, String, String> aggregator;
 
     // Test builds topology in succesive manner but only graph node not yet processed written to topology
 
@@ -101,6 +111,77 @@ public class StreamsGraphTest {
         builder.build(properties);
     }
 
+    @Test 
+    @SuppressWarnings("unchecked")
+    // Topology in this test from https://issues.apache.org/jira/browse/KAFKA-9739
+    public void shouldNotThrowNPEWithMergeNodes() {
+        final Properties properties = new Properties();
+        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
+        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        initializer = () -> "";
+        aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
+        final TransformerSupplier<String, String, KeyValue<String, String>> transformSupplier = () -> new Transformer<String, String, KeyValue<String, String>>() {
+            @Override
+            public void init(final ProcessorContext context) {
+
+            }
+
+            @Override
+            public KeyValue<String, String> transform(final String key, final String value) {
+                return KeyValue.pair(key, value);
+            }
+
+            @Override
+            public void close() {
+
+            }
+        };
+
+        final KStream<String, String> retryStream = builder.stream("retryTopic", Consumed.with(Serdes.String(), Serdes.String()))
+                .transform(transformSupplier)
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .aggregate(initializer,
+                        aggregator,
+                        Materialized.with(Serdes.String(), Serdes.String()))
+                .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(500), Suppressed.BufferConfig.maxBytes(64_000_000)))
+                .toStream()
+                .flatMap((k, v) -> new ArrayList<>());
+
+        final KTable<String, String> idTable = builder.stream("id-table-topic", Consumed.with(Serdes.String(), Serdes.String()))
+                .flatMap((k, v) -> new ArrayList<KeyValue<String, String>>())
+                .peek((subscriptionId, recipientId) -> System.out.println("data " + subscriptionId + " " + recipientId))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .aggregate(initializer,
+                        aggregator,
+                        Materialized.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> joinStream = builder.stream("internal-topic-command", Consumed.with(Serdes.String(), Serdes.String()))
+                .peek((subscriptionId, command) -> System.out.println("stdoutput"))
+                .mapValues((k, v) -> v)
+                .merge(retryStream)
+                .leftJoin(idTable, (v1, v2) -> v1 + v2,
+                        Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
+
+        final KStream<String, String>[] branches = joinStream.branch((k, v) -> v.equals("some-value"), (k, v) -> true);
+
+        branches[0].map(KeyValue::pair)
+                .peek((recipientId, command) -> System.out.println("printing out"))
+                .to("external-command", Produced.with(Serdes.String(), Serdes.String()));
+
+        branches[1].filter((k, v) -> v != null)
+                .peek((subscriptionId, wrapper) -> System.out.println("Printing output"))
+                .mapValues((k, v) -> v)
+                .to("dlq-topic", Produced.with(Serdes.String(), Serdes.String()));
+
+        branches[1].map(KeyValue::pair).to("retryTopic", Produced.with(Serdes.String(), Serdes.String()));
+
+        final Topology topology = builder.build(properties);
+        assertEquals(expectedComplexMergeOptimizeTopology, topology.describe().toString());
+    }
+
     @Test
     public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
 
@@ -291,4 +372,109 @@ public class StreamsGraphTest {
         "    Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n" +
         "      <-- KSTREAM-MERGE-0000000006\n\n";
 
+
+    private final String expectedComplexMergeOptimizeTopology = "Topologies:\n" +
+            "   Sub-topology: 0\n" +
+            "    Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n" +
+            "      --> KSTREAM-TRANSFORM-0000000001\n" +
+            "    Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n" +
+            "      --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n" +
+            "      <-- KSTREAM-SOURCE-0000000000\n" +
+            "    Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter (stores: [])\n" +
+            "      --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink\n" +
+            "      <-- KSTREAM-TRANSFORM-0000000001\n" +
+            "    Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" +
+            "      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n" +
+            "\n" +
+            "  Sub-topology: 1\n" +
+            "    Source: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" +
+            "      --> KSTREAM-AGGREGATE-0000000003\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
+            "      --> KTABLE-SUPPRESS-0000000007\n" +
+            "      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source\n" +
+            "    Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n" +
+            "      --> KSTREAM-PEEK-0000000020\n" +
+            "    Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n" +
+            "      --> KTABLE-TOSTREAM-0000000009\n" +
+            "      <-- KSTREAM-AGGREGATE-0000000003\n" +
+            "    Processor: KSTREAM-PEEK-0000000020 (stores: [])\n" +
+            "      --> KSTREAM-MAPVALUES-0000000021\n" +
+            "      <-- KSTREAM-SOURCE-0000000019\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n" +
+            "      --> KSTREAM-FLATMAP-0000000010\n" +
+            "      <-- KTABLE-SUPPRESS-0000000007\n" +
+            "    Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n" +
+            "      --> KSTREAM-MERGE-0000000022\n" +
+            "      <-- KTABLE-TOSTREAM-0000000009\n" +
+            "    Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n" +
+            "      --> KSTREAM-MERGE-0000000022\n" +
+            "      <-- KSTREAM-PEEK-0000000020\n" +
+            "    Processor: KSTREAM-MERGE-0000000022 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000024\n" +
+            "      <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n" +
+            "    Processor: KSTREAM-FILTER-0000000024 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000023\n" +
+            "      <-- KSTREAM-MERGE-0000000022\n" +
+            "    Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000024\n" +
+            "\n" +
+            "  Sub-topology: 2\n" +
+            "    Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n" +
+            "      --> KSTREAM-FLATMAP-0000000012\n" +
+            "    Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n" +
+            "      --> KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n" +
+            "      <-- KSTREAM-SOURCE-0000000011\n" +
+            "    Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter (stores: [])\n" +
+            "      --> KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-sink\n" +
+            "      <-- KSTREAM-FLATMAP-0000000012\n" +
+            "    Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n" +
+            "      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n" +
+            "\n" +
+            "  Sub-topology: 3\n" +
+            "    Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n" +
+            "      --> KSTREAM-LEFTJOIN-0000000026\n" +
+            "    Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" +
+            "      --> KSTREAM-BRANCH-0000000027\n" +
+            "      <-- KSTREAM-SOURCE-0000000025\n" +
+            "    Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n" +
+            "      --> KSTREAM-BRANCHCHILD-0000000029, KSTREAM-BRANCHCHILD-0000000028\n" +
+            "      <-- KSTREAM-LEFTJOIN-0000000026\n" +
+            "    Processor: KSTREAM-BRANCHCHILD-0000000029 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n" +
+            "      <-- KSTREAM-BRANCH-0000000027\n" +
+            "    Processor: KSTREAM-BRANCHCHILD-0000000028 (stores: [])\n" +
+            "      --> KSTREAM-MAP-0000000030\n" +
+            "      <-- KSTREAM-BRANCH-0000000027\n" +
+            "    Processor: KSTREAM-FILTER-0000000033 (stores: [])\n" +
+            "      --> KSTREAM-PEEK-0000000034\n" +
+            "      <-- KSTREAM-BRANCHCHILD-0000000029\n" +
+            "    Source: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n" +
+            "      --> KSTREAM-PEEK-0000000013\n" +
+            "    Processor: KSTREAM-MAP-0000000030 (stores: [])\n" +
+            "      --> KSTREAM-PEEK-0000000031\n" +
+            "      <-- KSTREAM-BRANCHCHILD-0000000028\n" +
+            "    Processor: KSTREAM-PEEK-0000000034 (stores: [])\n" +
+            "      --> KSTREAM-MAPVALUES-0000000035\n" +
+            "      <-- KSTREAM-FILTER-0000000033\n" +
+            "    Processor: KSTREAM-MAP-0000000037 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000038\n" +
+            "      <-- KSTREAM-BRANCHCHILD-0000000029\n" +
+            "    Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000036\n" +
+            "      <-- KSTREAM-PEEK-0000000034\n" +
+            "    Processor: KSTREAM-PEEK-0000000013 (stores: [])\n" +
+            "      --> KSTREAM-AGGREGATE-0000000015\n" +
+            "      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source\n" +
+            "    Processor: KSTREAM-PEEK-0000000031 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000032\n" +
+            "      <-- KSTREAM-MAP-0000000030\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" +
+            "      --> none\n" +
+            "      <-- KSTREAM-PEEK-0000000013\n" +
+            "    Sink: KSTREAM-SINK-0000000032 (topic: external-command)\n" +
+            "      <-- KSTREAM-PEEK-0000000031\n" +
+            "    Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n" +
+            "      <-- KSTREAM-MAPVALUES-0000000035\n" +
+            "    Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n" +
+            "      <-- KSTREAM-MAP-0000000037\n\n";
 }