You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/11/16 03:07:38 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #22187: Refactor pipeline distributed barrier with SPI

sandynz commented on code in PR #22187:
URL: https://github.com/apache/shardingsphere/pull/22187#discussion_r1023454607


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineDistributedBarrier.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api;

Review Comment:
   SPI could put in `spi` package



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineDistributedBarrier.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api;
+
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Pipeline distributed barrier.
+ */
+@SingletonSPI
+public interface PipelineDistributedBarrier extends RequiredSPI, TypedSPI {

Review Comment:
   `TypedSPI` is not necessary, use `RequiredSPI` could simplify API usage



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineDistributedBarrierFactory.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
+
+/**
+ * Pipeline distributed barrier factory.
+ */
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PipelineDistributedBarrierFactory {
+    
+    static {
+        ShardingSphereServiceLoader.register(PipelineDistributedBarrier.class);
+    }
+    
+    /**
+     * Get instance of pipeline distribute barrier.
+     *
+     * @param type type
+     * @return got instance
+     */
+    public static PipelineDistributedBarrier getInstance(final String type) {
+        return TypedSPIRegistry.getRegisteredService(PipelineDistributedBarrier.class, type);
+    }

Review Comment:
   1, `type` parameter could be removed
   
   2, RequiredSPIRegistry could be used to replace TypedSPIRegistry
   



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineDistributedBarrierFactory.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api;

Review Comment:
   SPI factory could be put in `spi` package



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ZookeeperPipelineDistributedBarrier.java:
##########
@@ -38,9 +39,7 @@
  * Pipeline distributed barrier.
  */
 @Slf4j
-public final class PipelineDistributedBarrier {
-    
-    private static final PipelineDistributedBarrier INSTANCE = new PipelineDistributedBarrier();
+public final class ZookeeperPipelineDistributedBarrier implements PipelineDistributedBarrier {

Review Comment:
   It's better not add `Zookeeper` prefix for class name, it could support all persistance repository later



##########
kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineDistributedBarrier:
##########
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.api.impl.ZookeeperPipelineDistributedBarrier

Review Comment:
   New line is needed at end of file



##########
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java:
##########
@@ -45,7 +46,7 @@ public void assertRegisterAndRemove() throws NoSuchFieldException, IllegalAccess
         String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
         PersistRepository repository = PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
         repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
-        PipelineDistributedBarrier instance = PipelineDistributedBarrier.getInstance();
+        PipelineDistributedBarrier instance = PipelineContext.getPipelineDistributedBarrier();
         String parentPath = "/barrier";
         instance.register(parentPath, 1);
         Map countDownLatchMap = ReflectionUtil.getFieldValue(instance, "countDownLatchMap", Map.class);

Review Comment:
   Looks unit test failed since `countDownLatchMap`



##########
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.fixture;
+
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineDistributedBarrier;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.TimeUnit;
+
+public final class FixturePipelineDistributedBarrier implements PipelineDistributedBarrier {
+    
+    @Override
+    public void register(final String barrierPath, final int totalCount) {
+    }
+    
+    @Override
+    public void persistEphemeralChildrenNode(final String barrierPath, final int shardingItem) {
+    }
+    
+    @Override
+    public void unregister(final String barrierPath) {
+    }
+    
+    @Override
+    public boolean await(final String barrierPath, final long timeout, final TimeUnit timeUnit) {
+        return false;
+    }

Review Comment:
   Does `return false;` affect unit test? Seem it should `return true;`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java:
##########
@@ -56,8 +56,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
     
     protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     
-    private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
-    

Review Comment:
   Could we keep it and use PipelineDistributedBarrierFactory.getInstance() to replace PipelineDistributedBarrier.getInstance()



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java:
##########
@@ -53,6 +57,24 @@ public static void initModeConfig(final ModeConfiguration modeConfig) {
         PipelineContext.modeConfig = modeConfig;
     }
     
+    /**
+     * Get pipeline distributed barrier.
+     *
+     * @return pipeline distributed barrier
+     */
+    public static PipelineDistributedBarrier getPipelineDistributedBarrier() {
+        return pipelineDistributedBarrier;
+    }
+    
+    /**
+     * Initialize pipeline distributed barrier.
+     *
+     * @param type type
+     */
+    public static void initPipelineDistributedBarrier(final String type) {
+        pipelineDistributedBarrier = PipelineDistributedBarrierFactory.getInstance(type);
+    }

Review Comment:
   It could be removed



##########
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.fixture;
+
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineDistributedBarrier;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.TimeUnit;
+
+public final class FixturePipelineDistributedBarrier implements PipelineDistributedBarrier {
+    
+    @Override
+    public void register(final String barrierPath, final int totalCount) {
+    }
+    
+    @Override
+    public void persistEphemeralChildrenNode(final String barrierPath, final int shardingItem) {
+    }
+    
+    @Override
+    public void unregister(final String barrierPath) {
+    }
+    
+    @Override
+    public boolean await(final String barrierPath, final long timeout, final TimeUnit timeUnit) {
+        return false;
+    }
+    
+    @Override
+    public void notifyChildrenNodeCountCheck(final DataChangedEvent event) {
+    }
+    
+    @Override
+    public String getType() {
+        return "FIXTURE";
+    }

Review Comment:
   `getType()` could be replaced to `isDefault()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org