You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:00 UTC
[rocketmq-streams] 01/16: ConfigurableComponent insert and query
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 93d7f7753a5c03a25c93388d4d3cb34e2eaa79a9
Author: 维章 <un...@gmail.com>
AuthorDate: Mon May 23 10:45:02 2022 +0800
ConfigurableComponent insert and query
---
.../client/transform/window/WindowInfo.java | 3 +-
.../rocketmq/streams/client/DBDriverTest.java | 27 ++-----
.../rocketmq/streams/client/ORMUtilTest.java | 84 --------------------
.../org/apache/rocketmq/streams/client/Person.java | 89 ++++++++++++++++++++++
.../rocketmq/streams/common/utils/ReflectUtil.java | 2 +
5 files changed, 97 insertions(+), 108 deletions(-)
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java
index e43e0943..73b543b3 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java
@@ -27,8 +27,7 @@ public class WindowInfo {
public static int HOPPING_WINDOW = 1;//滑动窗口
public static int TUMBLING_WINDOW = 2;//滚动窗口
public static int SESSION_WINDOW = 3;
- public static int OVER_WINDOW = 4;
- public static int SHUFFLE_OVER_WINDOW = 5;
+
protected int type;//window类型 hopping,Tumbling
protected Time windowSize;//窗口大小
protected Time windowSlide;//滑动大小
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java
index 0894c873..8d6ada4c 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java
@@ -17,42 +17,25 @@
package org.apache.rocketmq.streams.client;
-import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.configurable.ConfigurableComponent;
-import org.apache.rocketmq.streams.configurable.model.Configure;
-import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.junit.Test;
import static junit.framework.TestCase.assertNotNull;
-/**
- * 数据库的存储,需要配置存储的连接参数,请先完成配置,后执行单元用例 如果未建表,可以通过Configure.createTableSQL() 获取建表语句,创建表后,测试
- */
+
public class DBDriverTest {
- private String URL = "";
- protected String USER_NAME = "";
- protected String PASSWORD = "";
- protected String TABLE_NAME = "rocketmq_streams_configure_source";
@Test
public void testDBConfigurableService() {
String namespace = "streams.db.configurable";
- //正式使用时,在配置文件配置
- ComponentCreator.getProperties().put(ConfigureFileKey.CONNECT_TYPE, "DB");
- ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL);//数据库连接url
- ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME);//用户名
- ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD);//password
- ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_TABLE_NAME, TABLE_NAME);
-
- //如果表不存在,创建表
- String sql = (Configure.createTableSQL(TABLE_NAME));
- DriverBuilder.createDriver().execute(sql);
- ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+ ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance("2211");
configurableComponent.insert(createPerson(namespace));
configurableComponent.refreshConfigurable(namespace);
Person person = configurableComponent.queryConfigurable("person", "peronName");
+ System.out.println(person.getName());
+ System.out.println(person.getAge());
+
assertNotNull(person);
}
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java
index 9b649d5f..ab2961a0 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java
@@ -86,87 +86,3 @@ public class ORMUtilTest {
}
-class Person extends BasedConfigurable {
- @ENVDependence
- private String name;
- private int age;
- private Boolean isMale;
- private List<String> addresses;
- private Map<String, Integer> childName2Age;
-
- public static Person createPerson(String namespace) {
- Person person = new Person();
- person.setNameSpace(namespace);
- person.setType("person");
- person.setConfigureName("Chris");
- person.setName("Chris");
- List<String> addresses = new ArrayList<>();
- addresses.add("huilongguan");
- addresses.add("shangdi");
- person.setAddresses(addresses);
- Map<String, Integer> childName2Age = new HashMap<>();
- childName2Age.put("yuanyahan", 8);
- childName2Age.put("yuanruxi", 4);
- person.setChildName2Age(childName2Age);
- person.setMale(true);
- person.setAge(18);
- return person;
- }
-
- @Override
- public String toString() {
- return "org.apache.rocketmq.streams.Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
- + ", childName2Age=" + childName2Age + '}';
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public int getAge() {
- return age;
- }
-
- public void setAge(int age) {
- this.age = age;
- }
-
- public Boolean getMale() {
- return isMale;
- }
-
- public void setMale(Boolean male) {
- isMale = male;
- }
-
- public List<String> getAddresses() {
- return addresses;
- }
-
- public void setAddresses(List<String> addresses) {
- this.addresses = addresses;
- }
-
- public Map<String, Integer> getChildName2Age() {
- return childName2Age;
- }
-
- public void setChildName2Age(Map<String, Integer> childName2Age) {
- this.childName2Age = childName2Age;
- }
-
- @Override
- public Object clone() {
- Person person = null;
- try {
- person = (Person)super.clone();
- } catch (CloneNotSupportedException e) {
- System.out.println("clone error " + e);
- }
- return person;
- }
-}
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/Person.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/Person.java
new file mode 100644
index 00000000..780d7d9d
--- /dev/null
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/Person.java
@@ -0,0 +1,89 @@
+package org.apache.rocketmq.streams.client;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+
+import java.util.List;
+import java.util.Map;
+
+public class Person extends BasedConfigurable {
+ @ENVDependence
+ private String name;
+ private int age;
+ private Boolean isMale;
+ private List<String> addresses;
+ private Map<String, Integer> childName2Age;
+
+ @Override
+ public String toString() {
+ return "org.apache.rocketmq.streams.Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+ + ", childName2Age=" + childName2Age + '}';
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public Boolean getMale() {
+ return isMale;
+ }
+
+ public void setMale(Boolean male) {
+ isMale = male;
+ }
+
+ public List<String> getAddresses() {
+ return addresses;
+ }
+
+ public void setAddresses(List<String> addresses) {
+ this.addresses = addresses;
+ }
+
+ public Map<String, Integer> getChildName2Age() {
+ return childName2Age;
+ }
+
+ public void setChildName2Age(Map<String, Integer> childName2Age) {
+ this.childName2Age = childName2Age;
+ }
+
+ @Override
+ public Object clone() {
+ Person person = null;
+ try {
+ person = (Person) super.clone();
+ } catch (CloneNotSupportedException e) {
+ System.out.println("clone error " + e);
+ }
+ return person;
+ }
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java
index 77f1612c..de5a356f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java
@@ -679,6 +679,7 @@ public class ReflectUtil {
DataType dataType = DataTypeUtil.getDataTypeFromClass(fieldValue.getClass());
Object convertFieldValue = dataType.convert(fieldValue);
if (method != null) {
+ method.setAccessible(true);
method.invoke(object, convertFieldValue);
} else {
Field field = object.getClass().getDeclaredField(modelFieldName);
@@ -753,6 +754,7 @@ public class ReflectUtil {
if (method == null) {
throw new RuntimeException("can not get " + fieldName + "'s value, the method is not exist");
}
+ method.setAccessible(true);
return (T) method.invoke(bean);
} catch (Exception e) {
throw new RuntimeException("can not get " + fieldName + "'s value", e);