You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/05 04:55:08 UTC

[GitHub] [beam] sorokin-andrey commented on a diff in pull request #22584: [CdapIO] Integration CdapIO with SparkReceiverIO

sorokin-andrey commented on code in PR #22584:
URL: https://github.com/apache/beam/pull/22584#discussion_r938434822


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -83,46 +86,50 @@ public Plugin withConfig(PluginConfig pluginConfig) {
    * validating connection to the CDAP sink/source and performing initial tuning.
    */
   public void prepareRun() {
-    PluginConfig pluginConfig = getPluginConfig();
-    checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
-    if (cdapPluginObj == null) {
-      try {
-        Constructor<?> constructor =
-            getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
-        constructor.setAccessible(true);
-        cdapPluginObj = (SubmitterLifecycle) constructor.newInstance(pluginConfig);
-      } catch (Exception e) {
-        LOG.error("Can not instantiate CDAP plugin class", e);
-        throw new IllegalStateException("Can not call prepareRun");
-      }
-    }
-    try {
-      cdapPluginObj.prepareRun(getContext());
-      if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
-        for (Map.Entry<String, String> entry :
-            getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) {
-          getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+    if (!isUnbounded()) {
+      PluginConfig pluginConfig = getPluginConfig();
+      checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
+      if (cdapPluginObj == null) {
+        try {
+          Constructor<?> constructor =
+              getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
+          constructor.setAccessible(true);
+          cdapPluginObj = (SubmitterLifecycle) constructor.newInstance(pluginConfig);
+        } catch (Exception e) {
+          LOG.error("Can not instantiate CDAP plugin class", e);
+          throw new IllegalStateException("Can not call prepareRun");
         }
-      } else {
-        for (Map.Entry<String, String> entry :
-            getContext().getOutputFormatProvider().getOutputFormatConfiguration().entrySet()) {
-          getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+      }
+      try {
+        cdapPluginObj.prepareRun(getContext());

Review Comment:
   Basically, we need this try-catch only to deal with this line of code. If you move it to a separate private method, it will increase the readability of the code dramatically.



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -83,46 +86,50 @@ public Plugin withConfig(PluginConfig pluginConfig) {
    * validating connection to the CDAP sink/source and performing initial tuning.
    */
   public void prepareRun() {
-    PluginConfig pluginConfig = getPluginConfig();
-    checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
-    if (cdapPluginObj == null) {
-      try {
-        Constructor<?> constructor =
-            getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
-        constructor.setAccessible(true);
-        cdapPluginObj = (SubmitterLifecycle) constructor.newInstance(pluginConfig);
-      } catch (Exception e) {
-        LOG.error("Can not instantiate CDAP plugin class", e);
-        throw new IllegalStateException("Can not call prepareRun");
-      }
-    }
-    try {
-      cdapPluginObj.prepareRun(getContext());
-      if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
-        for (Map.Entry<String, String> entry :
-            getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) {
-          getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+    if (!isUnbounded()) {

Review Comment:
   I would recommend inverting this conditional operator. 
   This will allow us to reduce the nesting of the code by 1 tab.



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -83,46 +86,50 @@ public Plugin withConfig(PluginConfig pluginConfig) {
    * validating connection to the CDAP sink/source and performing initial tuning.
    */
   public void prepareRun() {
-    PluginConfig pluginConfig = getPluginConfig();
-    checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
-    if (cdapPluginObj == null) {
-      try {
-        Constructor<?> constructor =
-            getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
-        constructor.setAccessible(true);
-        cdapPluginObj = (SubmitterLifecycle) constructor.newInstance(pluginConfig);
-      } catch (Exception e) {
-        LOG.error("Can not instantiate CDAP plugin class", e);
-        throw new IllegalStateException("Can not call prepareRun");
-      }
-    }
-    try {
-      cdapPluginObj.prepareRun(getContext());
-      if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
-        for (Map.Entry<String, String> entry :
-            getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) {
-          getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+    if (!isUnbounded()) {
+      PluginConfig pluginConfig = getPluginConfig();
+      checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
+      if (cdapPluginObj == null) {
+        try {
+          Constructor<?> constructor =
+              getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
+          constructor.setAccessible(true);
+          cdapPluginObj = (SubmitterLifecycle) constructor.newInstance(pluginConfig);
+        } catch (Exception e) {
+          LOG.error("Can not instantiate CDAP plugin class", e);
+          throw new IllegalStateException("Can not call prepareRun");
         }
-      } else {
-        for (Map.Entry<String, String> entry :
-            getContext().getOutputFormatProvider().getOutputFormatConfiguration().entrySet()) {
-          getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+      }
+      try {
+        cdapPluginObj.prepareRun(getContext());
+        if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
+          for (Map.Entry<String, String> entry :

Review Comment:
   Personally, I think this is unclear what you are doing here and why. Plus these for loops look almost identical, except for the collection part.



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -83,46 +86,50 @@ public Plugin withConfig(PluginConfig pluginConfig) {
    * validating connection to the CDAP sink/source and performing initial tuning.
    */
   public void prepareRun() {
-    PluginConfig pluginConfig = getPluginConfig();
-    checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
-    if (cdapPluginObj == null) {
-      try {
-        Constructor<?> constructor =
-            getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
-        constructor.setAccessible(true);
-        cdapPluginObj = (SubmitterLifecycle) constructor.newInstance(pluginConfig);
-      } catch (Exception e) {
-        LOG.error("Can not instantiate CDAP plugin class", e);
-        throw new IllegalStateException("Can not call prepareRun");
-      }
-    }
-    try {
-      cdapPluginObj.prepareRun(getContext());
-      if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
-        for (Map.Entry<String, String> entry :
-            getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) {
-          getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+    if (!isUnbounded()) {
+      PluginConfig pluginConfig = getPluginConfig();
+      checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
+      if (cdapPluginObj == null) {

Review Comment:
   Why didn't you move this conditional block to the separate method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org