You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:00 UTC

[rocketmq-streams] 01/16: ConfigurableComponent insert and query

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit 93d7f7753a5c03a25c93388d4d3cb34e2eaa79a9
Author: 维章 <un...@gmail.com>
AuthorDate: Mon May 23 10:45:02 2022 +0800

    ConfigurableComponent insert and query
---
 .../client/transform/window/WindowInfo.java        |  3 +-
 .../rocketmq/streams/client/DBDriverTest.java      | 27 ++-----
 .../rocketmq/streams/client/ORMUtilTest.java       | 84 --------------------
 .../org/apache/rocketmq/streams/client/Person.java | 89 ++++++++++++++++++++++
 .../rocketmq/streams/common/utils/ReflectUtil.java |  2 +
 5 files changed, 97 insertions(+), 108 deletions(-)

diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java
index e43e0943..73b543b3 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java
@@ -27,8 +27,7 @@ public class WindowInfo {
     public static int HOPPING_WINDOW = 1;//滑动窗口
     public static int TUMBLING_WINDOW = 2;//滚动窗口
     public static int SESSION_WINDOW = 3;
-    public static int OVER_WINDOW = 4;
-    public static int SHUFFLE_OVER_WINDOW = 5;
+
     protected int type;//window类型 hopping,Tumbling
     protected Time windowSize;//窗口大小
     protected Time windowSlide;//滑动大小
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java
index 0894c873..8d6ada4c 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java
@@ -17,42 +17,25 @@
 
 package org.apache.rocketmq.streams.client;
 
-import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
 import org.apache.rocketmq.streams.configurable.ConfigurableComponent;
-import org.apache.rocketmq.streams.configurable.model.Configure;
-import org.apache.rocketmq.streams.db.driver.DriverBuilder;
 import org.junit.Test;
 
 import static junit.framework.TestCase.assertNotNull;
 
-/**
- * 数据库的存储,需要配置存储的连接参数,请先完成配置,后执行单元用例 如果未建表,可以通过Configure.createTableSQL() 获取建表语句,创建表后,测试
- */
+
 public class DBDriverTest {
-    private String URL = "";
-    protected String USER_NAME = "";
-    protected String PASSWORD = "";
-    protected String TABLE_NAME = "rocketmq_streams_configure_source";
 
     @Test
     public void testDBConfigurableService() {
         String namespace = "streams.db.configurable";
 
-        //正式使用时,在配置文件配置
-        ComponentCreator.getProperties().put(ConfigureFileKey.CONNECT_TYPE, "DB");
-        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL);//数据库连接url
-        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME);//用户名
-        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD);//password
-        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_TABLE_NAME, TABLE_NAME);
-
-        //如果表不存在,创建表
-        String sql = (Configure.createTableSQL(TABLE_NAME));
-        DriverBuilder.createDriver().execute(sql);
-        ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+        ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance("2211");
         configurableComponent.insert(createPerson(namespace));
         configurableComponent.refreshConfigurable(namespace);
         Person person = configurableComponent.queryConfigurable("person", "peronName");
+        System.out.println(person.getName());
+        System.out.println(person.getAge());
+
         assertNotNull(person);
     }
 
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java
index 9b649d5f..ab2961a0 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java
@@ -86,87 +86,3 @@ public class ORMUtilTest {
 }
 
 
-class Person extends BasedConfigurable {
-    @ENVDependence
-    private String name;
-    private int age;
-    private Boolean isMale;
-    private List<String> addresses;
-    private Map<String, Integer> childName2Age;
-
-    public static Person createPerson(String namespace) {
-        Person person = new Person();
-        person.setNameSpace(namespace);
-        person.setType("person");
-        person.setConfigureName("Chris");
-        person.setName("Chris");
-        List<String> addresses = new ArrayList<>();
-        addresses.add("huilongguan");
-        addresses.add("shangdi");
-        person.setAddresses(addresses);
-        Map<String, Integer> childName2Age = new HashMap<>();
-        childName2Age.put("yuanyahan", 8);
-        childName2Age.put("yuanruxi", 4);
-        person.setChildName2Age(childName2Age);
-        person.setMale(true);
-        person.setAge(18);
-        return person;
-    }
-
-    @Override
-    public String toString() {
-        return "org.apache.rocketmq.streams.Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
-            + ", childName2Age=" + childName2Age + '}';
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public int getAge() {
-        return age;
-    }
-
-    public void setAge(int age) {
-        this.age = age;
-    }
-
-    public Boolean getMale() {
-        return isMale;
-    }
-
-    public void setMale(Boolean male) {
-        isMale = male;
-    }
-
-    public List<String> getAddresses() {
-        return addresses;
-    }
-
-    public void setAddresses(List<String> addresses) {
-        this.addresses = addresses;
-    }
-
-    public Map<String, Integer> getChildName2Age() {
-        return childName2Age;
-    }
-
-    public void setChildName2Age(Map<String, Integer> childName2Age) {
-        this.childName2Age = childName2Age;
-    }
-
-    @Override
-    public Object clone() {
-        Person person = null;
-        try {
-            person = (Person)super.clone();
-        } catch (CloneNotSupportedException e) {
-            System.out.println("clone error " + e);
-        }
-        return person;
-    }
-}
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/Person.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/Person.java
new file mode 100644
index 00000000..780d7d9d
--- /dev/null
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/Person.java
@@ -0,0 +1,89 @@
+package org.apache.rocketmq.streams.client;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+
+import java.util.List;
+import java.util.Map;
+
+public class Person extends BasedConfigurable {
+    @ENVDependence
+    private String name;
+    private int age;
+    private Boolean isMale;
+    private List<String> addresses;
+    private Map<String, Integer> childName2Age;
+
+    @Override
+    public String toString() {
+        return "org.apache.rocketmq.streams.Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+                + ", childName2Age=" + childName2Age + '}';
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public int getAge() {
+        return age;
+    }
+
+    public void setAge(int age) {
+        this.age = age;
+    }
+
+    public Boolean getMale() {
+        return isMale;
+    }
+
+    public void setMale(Boolean male) {
+        isMale = male;
+    }
+
+    public List<String> getAddresses() {
+        return addresses;
+    }
+
+    public void setAddresses(List<String> addresses) {
+        this.addresses = addresses;
+    }
+
+    public Map<String, Integer> getChildName2Age() {
+        return childName2Age;
+    }
+
+    public void setChildName2Age(Map<String, Integer> childName2Age) {
+        this.childName2Age = childName2Age;
+    }
+
+    @Override
+    public Object clone() {
+        Person person = null;
+        try {
+            person = (Person) super.clone();
+        } catch (CloneNotSupportedException e) {
+            System.out.println("clone error " + e);
+        }
+        return person;
+    }
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java
index 77f1612c..de5a356f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java
@@ -679,6 +679,7 @@ public class ReflectUtil {
             DataType dataType = DataTypeUtil.getDataTypeFromClass(fieldValue.getClass());
             Object convertFieldValue = dataType.convert(fieldValue);
             if (method != null) {
+                method.setAccessible(true);
                 method.invoke(object, convertFieldValue);
             } else {
                 Field field = object.getClass().getDeclaredField(modelFieldName);
@@ -753,6 +754,7 @@ public class ReflectUtil {
             if (method == null) {
                 throw new RuntimeException("can not get " + fieldName + "'s value, the method is not exist");
             }
+            method.setAccessible(true);
             return (T) method.invoke(bean);
         } catch (Exception e) {
             throw new RuntimeException("can not get " + fieldName + "'s value", e);