You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "zhengbuqian (via GitHub)" <gi...@apache.org> on 2023/02/08 05:08:41 UTC

[GitHub] [beam] zhengbuqian commented on a diff in pull request #23492: Add Windmill support for MultimapState

zhengbuqian commented on code in PR #23492:
URL: https://github.com/apache/beam/pull/23492#discussion_r1099646390


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1999,56 +2005,79 @@ public Iterable<Entry<K, V>> read() {
           Future<Iterable<Entry<ByteString, Iterable<V>>>> persistedData = getFuture(false);
           try (Closeable scope = scopedReadState()) {
             Iterable<Entry<ByteString, Iterable<V>>> entries = persistedData.get();
-            Map<Object, ConcatIterables<V>> entryMap = Maps.newHashMap();
-            entries.forEach(
-                entry -> {
-                  try {
-                    final K key = keyCoder.decode(entry.getKey().newInput());
-                    final Object structuralKey = keyCoder.structuralValue(key);
-                    KeyState keyState =
-                        keyStateMap.computeIfAbsent(structuralKey, k -> new KeyState(key));
-                    if (keyState.existence == KeyExistence.KNOWN_NONEXISTENT) return;
-                    entryMap.compute(
-                        structuralKey,
-                        (k, v) -> {
-                          if (v == null) v = new ConcatIterables<>();
-                          v.extendWith(entry.getValue());
-                          keyState.existence = KeyExistence.KNOWN_EXIST;
-                          return v;
-                        });
-                  } catch (IOException e) {
-                    throw new RuntimeException(e);
-                  }
-                });
+            // If a key returned by windmill is known to be no longer exist or is completely cached
+            // locally, we can safely ignore the content of this key in windmill. Thus, we filter
+            // entries to filteredEntries which contains only keys that are known to exist and not
+            // fully cached.
+            Iterable<Entry<Object, Iterable<V>>> filteredEntries =
+                Iterables.filter(
+                    Iterables.transform(
+                        entries,
+                        entry -> {
+                          try {
+                            final K key = keyCoder.decode(entry.getKey().newInput());
+                            final Object structuralKey = keyCoder.structuralValue(key);
+                            KeyState keyState =
+                                keyStateMap.computeIfAbsent(structuralKey, k -> new KeyState(key));
+                            if (keyState.existence == KeyExistence.UNKNOWN_EXISTENCE) {
+                              keyState.existence = KeyExistence.KNOWN_EXIST;
+                            }
+                            return new AbstractMap.SimpleEntry<>(structuralKey, entry.getValue());
+                          } catch (IOException e) {
+                            throw new RuntimeException(e);
+                          }
+                        }),
+                    entry -> {
+                      KeyState keyState = keyStateMap.get(entry.getKey());

Review Comment:
   Done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1947,10 +1952,11 @@ public ReadableState<Iterable<K>> readLater() {
     private MultimapIterables<K, V> mergedCachedEntries() {
       MultimapIterables<K, V> result = new MultimapIterables<>();
       for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
-        if (!entry.getValue().localAdditions.isEmpty()) {
+        KeyState keyState = entry.getValue();
+        if (!keyState.localAdditions.isEmpty()) {
           result.extendWith(entry.getValue().originalKey, entry.getValue().localAdditions);

Review Comment:
   Done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1999,56 +2005,79 @@ public Iterable<Entry<K, V>> read() {
           Future<Iterable<Entry<ByteString, Iterable<V>>>> persistedData = getFuture(false);
           try (Closeable scope = scopedReadState()) {
             Iterable<Entry<ByteString, Iterable<V>>> entries = persistedData.get();
-            Map<Object, ConcatIterables<V>> entryMap = Maps.newHashMap();
-            entries.forEach(
-                entry -> {
-                  try {
-                    final K key = keyCoder.decode(entry.getKey().newInput());
-                    final Object structuralKey = keyCoder.structuralValue(key);
-                    KeyState keyState =
-                        keyStateMap.computeIfAbsent(structuralKey, k -> new KeyState(key));
-                    if (keyState.existence == KeyExistence.KNOWN_NONEXISTENT) return;
-                    entryMap.compute(
-                        structuralKey,
-                        (k, v) -> {
-                          if (v == null) v = new ConcatIterables<>();
-                          v.extendWith(entry.getValue());
-                          keyState.existence = KeyExistence.KNOWN_EXIST;
-                          return v;
-                        });
-                  } catch (IOException e) {
-                    throw new RuntimeException(e);
-                  }
-                });
+            // If a key returned by windmill is known to be no longer exist or is completely cached
+            // locally, we can safely ignore the content of this key in windmill. Thus, we filter
+            // entries to filteredEntries which contains only keys that are known to exist and not
+            // fully cached.
+            Iterable<Entry<Object, Iterable<V>>> filteredEntries =
+                Iterables.filter(
+                    Iterables.transform(
+                        entries,
+                        entry -> {
+                          try {
+                            final K key = keyCoder.decode(entry.getKey().newInput());
+                            final Object structuralKey = keyCoder.structuralValue(key);
+                            KeyState keyState =
+                                keyStateMap.computeIfAbsent(structuralKey, k -> new KeyState(key));
+                            if (keyState.existence == KeyExistence.UNKNOWN_EXISTENCE) {
+                              keyState.existence = KeyExistence.KNOWN_EXIST;
+                            }
+                            return new AbstractMap.SimpleEntry<>(structuralKey, entry.getValue());
+                          } catch (IOException e) {
+                            throw new RuntimeException(e);
+                          }
+                        }),
+                    entry -> {
+                      KeyState keyState = keyStateMap.get(entry.getKey());
+                      return keyState.existence != KeyExistence.KNOWN_NONEXISTENT
+                          && !(keyState.existence == KeyExistence.KNOWN_EXIST
+                              && keyState.valuesCached);
+                    });
+
             if (entries instanceof Weighted) {
               // This is a known amount of data, cache them all.
-              entryMap.forEach(
-                  (structuralKey, values) -> {
+              filteredEntries.forEach(
+                  entry -> {
+                    final Object structuralKey = entry.getKey();
                     KeyState keyState = keyStateMap.get(structuralKey);
-                    if (!keyState.valuesCached) {
-                      keyState.values.extendWith(values);
-                      keyState.valuesCached = true;
-                    }
+                    keyState.existence = KeyExistence.KNOWN_EXIST;
+                    keyState.values.extendWith(entry.getValue());
+                    // We can't set keyState.valuesCached to true here, because there may be more
+                    // paginated values that should not be filtered out in filteredEntries.
                   });
               allKeysKnown = true;
               complete = true;
+              // Unload keys that are not known exist from cache, set valuesCached of all cached
+              // entries to true.
               keyStateMap
                   .entrySet()
                   .removeIf(
-                      entry ->
-                          entry.getValue().existence == KeyExistence.KNOWN_NONEXISTENT
-                              && !entry.getValue().removedLocally);
+                      entry -> {
+                        KeyState keyState = entry.getValue();
+                        boolean shouldRemove =
+                            (keyState.existence == KeyExistence.KNOWN_NONEXISTENT
+                                    && !keyState.removedLocally)
+                                || keyState.existence == KeyExistence.UNKNOWN_EXISTENCE;
+                        keyState.valuesCached = !shouldRemove;
+                        return shouldRemove;
+                      });
               return Iterables.unmodifiableIterable(mergedCachedEntries());
             } else {
-              MultimapIterables<K, V> local = mergedCachedEntries();
-              entryMap.forEach(
-                  (structuralKey, values) -> {
-                    KeyState keyState = keyStateMap.get(structuralKey);
-                    if (!keyState.valuesCached) {
-                      local.extendWith(keyState.originalKey, values);
-                    }
-                  });
-              return Iterables.unmodifiableIterable(local);
+              Iterable<Entry<K, V>> fromWindmill =
+                  Iterables.concat(
+                      Iterables.transform(
+                          Iterables.transform(
+                              filteredEntries,
+                              entry ->
+                                  new AbstractMap.SimpleEntry<>(
+                                      keyStateMap.get(entry.getKey()).originalKey,
+                                      entry.getValue())),
+                          entry ->
+                              Iterables.transform(
+                                  entry.getValue(),
+                                  v -> new AbstractMap.SimpleEntry<>(entry.getKey(), v))));
+              return Iterables.unmodifiableIterable(
+                  Iterables.concat(mergedCachedEntries(), fromWindmill));

Review Comment:
   The API doesn't specify the order of keys, with the new changes values of different keys will not interleave.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1999,56 +2005,79 @@ public Iterable<Entry<K, V>> read() {
           Future<Iterable<Entry<ByteString, Iterable<V>>>> persistedData = getFuture(false);
           try (Closeable scope = scopedReadState()) {
             Iterable<Entry<ByteString, Iterable<V>>> entries = persistedData.get();
-            Map<Object, ConcatIterables<V>> entryMap = Maps.newHashMap();
-            entries.forEach(
-                entry -> {
-                  try {
-                    final K key = keyCoder.decode(entry.getKey().newInput());
-                    final Object structuralKey = keyCoder.structuralValue(key);
-                    KeyState keyState =
-                        keyStateMap.computeIfAbsent(structuralKey, k -> new KeyState(key));
-                    if (keyState.existence == KeyExistence.KNOWN_NONEXISTENT) return;
-                    entryMap.compute(
-                        structuralKey,
-                        (k, v) -> {
-                          if (v == null) v = new ConcatIterables<>();
-                          v.extendWith(entry.getValue());
-                          keyState.existence = KeyExistence.KNOWN_EXIST;
-                          return v;
-                        });
-                  } catch (IOException e) {
-                    throw new RuntimeException(e);
-                  }
-                });
+            // If a key returned by windmill is known to be no longer exist or is completely cached
+            // locally, we can safely ignore the content of this key in windmill. Thus, we filter
+            // entries to filteredEntries which contains only keys that are known to exist and not
+            // fully cached.
+            Iterable<Entry<Object, Iterable<V>>> filteredEntries =
+                Iterables.filter(
+                    Iterables.transform(
+                        entries,
+                        entry -> {
+                          try {
+                            final K key = keyCoder.decode(entry.getKey().newInput());
+                            final Object structuralKey = keyCoder.structuralValue(key);
+                            KeyState keyState =
+                                keyStateMap.computeIfAbsent(structuralKey, k -> new KeyState(key));
+                            if (keyState.existence == KeyExistence.UNKNOWN_EXISTENCE) {
+                              keyState.existence = KeyExistence.KNOWN_EXIST;
+                            }
+                            return new AbstractMap.SimpleEntry<>(structuralKey, entry.getValue());
+                          } catch (IOException e) {
+                            throw new RuntimeException(e);
+                          }
+                        }),
+                    entry -> {
+                      KeyState keyState = keyStateMap.get(entry.getKey());
+                      return keyState.existence != KeyExistence.KNOWN_NONEXISTENT
+                          && !(keyState.existence == KeyExistence.KNOWN_EXIST
+                              && keyState.valuesCached);
+                    });
+
             if (entries instanceof Weighted) {
               // This is a known amount of data, cache them all.
-              entryMap.forEach(
-                  (structuralKey, values) -> {
+              filteredEntries.forEach(
+                  entry -> {
+                    final Object structuralKey = entry.getKey();
                     KeyState keyState = keyStateMap.get(structuralKey);
-                    if (!keyState.valuesCached) {
-                      keyState.values.extendWith(values);
-                      keyState.valuesCached = true;
-                    }
+                    keyState.existence = KeyExistence.KNOWN_EXIST;
+                    keyState.values.extendWith(entry.getValue());
+                    // We can't set keyState.valuesCached to true here, because there may be more

Review Comment:
   Done. Now both cases only iterate through `entries` once and `keyStateMap` once. But I have to make `mergedCachedEntries` not side effect free in order to not iterate through `keyStateMap` twice. `mergedCachedEntries` is private and I have added detailed comment so should be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org