You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/22 08:47:13 UTC

[flink-statefun] branch release-2.2 updated (cc83110 -> 875ef5f)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from cc83110  [release] [docs] Update docs config for 2.2
     new 71b8b71  [hotfix][dockerfile] Add newline to end of flink-conf
     new 22fc4d3  [FLINK-19330][core] Move intialization logic to open() instead initializeState()
     new 80d4c70  [FLINK-19329] Check for the existence of managed resources during dispose
     new 875ef5f  [FLINK-19327][k8s] Bump JobManager heap size to 1 GB

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/core/functions/FunctionGroupOperator.java        | 14 +++++++++-----
 .../flink-distribution-template/conf/flink-conf.yaml       |  2 +-
 tools/k8s/templates/master-deployment.yaml                 |  3 +++
 tools/k8s/values.yaml                                      |  3 ++-
 4 files changed, 15 insertions(+), 7 deletions(-)


[flink-statefun] 04/04: [FLINK-19327][k8s] Bump JobManager heap size to 1 GB

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 875ef5f73f9bc073a5955db94d1d7968d9695fa6
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Mon Sep 21 14:49:07 2020 +0200

    [FLINK-19327][k8s] Bump JobManager heap size to 1 GB
    
    This closes #157.
---
 tools/k8s/templates/master-deployment.yaml | 3 +++
 tools/k8s/values.yaml                      | 3 ++-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/tools/k8s/templates/master-deployment.yaml b/tools/k8s/templates/master-deployment.yaml
index e79f20d..1f69f60 100644
--- a/tools/k8s/templates/master-deployment.yaml
+++ b/tools/k8s/templates/master-deployment.yaml
@@ -37,6 +37,9 @@ spec:
               value: master
             - name: MASTER_HOST
               value: {{ .Values.master.name }}
+          resources:
+            requests:
+              memory: "{{ .Values.master.container_mem }}"
           ports:
             - containerPort: 6123
               name: rpc
diff --git a/tools/k8s/values.yaml b/tools/k8s/values.yaml
index 112ff80..6568daf 100644
--- a/tools/k8s/values.yaml
+++ b/tools/k8s/values.yaml
@@ -22,7 +22,8 @@ checkpoint:
 master:
   name: statefun-master
   image: statefun-application # replace with your image
-  jvm_mem: 500m
+  jvm_mem: 1g
+  container_mem: 1.5Gi
 
 worker:
   name: statefun-worker


[flink-statefun] 03/04: [FLINK-19329] Check for the existence of managed resources during dispose

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 80d4c70d799abf1bde8d2195b27bc6b9555a4694
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Mon Sep 21 16:15:00 2020 +0200

    [FLINK-19329] Check for the existence of managed resources during dispose
    
    This closes #158.
---
 .../flink/statefun/flink/core/functions/FunctionGroupOperator.java  | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 9cc0818..381a672 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -184,6 +184,12 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
   }
 
   private void closeOrDispose() {
+    final List<ManagingResources> managingResources = this.managingResources;
+    if (managingResources == null) {
+      // dispose can be called before state initialization was completed (for example a failure
+      // during initialization).
+      return;
+    }
     for (ManagingResources withResources : managingResources) {
       try {
         withResources.shutdown();


[flink-statefun] 01/04: [hotfix][dockerfile] Add newline to end of flink-conf

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 71b8b7172f5fe6ab41aecc53250531f8b5b42e0d
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Mon Sep 21 17:18:55 2020 -0500

    [hotfix][dockerfile] Add newline to end of flink-conf
    
    When building small applications it's often easy to simply append
    flink configurations to the provided base image directly inside the
    Dockerfile. echo does not prepend a newline so its convenient to leave
    one at the end of the file. Otherwise, the first configuration is added
    to the task manager memory amount which leads to a strange parse error on
    deployment.
    
    This closes #160.
---
 tools/docker/flink-distribution-template/conf/flink-conf.yaml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tools/docker/flink-distribution-template/conf/flink-conf.yaml b/tools/docker/flink-distribution-template/conf/flink-conf.yaml
index 929df96..8b9b4e4 100644
--- a/tools/docker/flink-distribution-template/conf/flink-conf.yaml
+++ b/tools/docker/flink-distribution-template/conf/flink-conf.yaml
@@ -45,4 +45,4 @@ state.backend.incremental: true
 #==============================================================================
 
 jobmanager.memory.process.size: 1g
-taskmanager.memory.process.size: 4g
\ No newline at end of file
+taskmanager.memory.process.size: 4g


[flink-statefun] 02/04: [FLINK-19330][core] Move intialization logic to open() instead initializeState()

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 22fc4d34e2e6f3792cd041e447c400a2fd39e486
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Mon Sep 21 21:00:08 2020 +0200

    [FLINK-19330][core] Move intialization logic to open() instead initializeState()
    
    This commit moves the initialization logic of the operator to open()
    instead of initalizeState(). The reason is that starting from Flink 1.10,
    at intializeState the runtimeContext is not yet properly initialized.
    
    This closes #159.
---
 .../statefun/flink/core/functions/FunctionGroupOperator.java      | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index cbb9131..9cc0818 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
@@ -93,9 +92,8 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
   }
 
   @Override
-  public void initializeState(StateInitializationContext context) throws Exception {
-    super.initializeState(context);
-
+  public void open() throws Exception {
+    super.open();
     final StatefulFunctionsUniverse statefulFunctionsUniverse =
         statefulFunctionsUniverse(configuration);
 
@@ -108,7 +106,7 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
         new ListStateDescriptor<>(
             FlinkStateDelayedMessagesBuffer.BUFFER_STATE_NAME, envelopeSerializer.duplicate());
     final MapState<Long, Message> asyncOperationState =
-        context.getKeyedStateStore().getMapState(asyncOperationStateDescriptor);
+        getRuntimeContext().getMapState(asyncOperationStateDescriptor);
 
     Objects.requireNonNull(mailboxExecutor, "MailboxExecutor is unexpectedly NULL");