You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Huang Meilong <im...@outlook.com> on 2018/08/16 11:17:57 UTC
答复: problem when streaming data to a sql table
i resolved it by setting KEY_TYPE and VALUE_TYPE.
________________________________
发件人: Huang Meilong <im...@outlook.com>
发送时间: 2018年8月16日 17:42:33
收件人: user@ignite.apache.org
主题: problem when streaming data to a sql table
Hi,
I'm streaming data to a sql table like this,
'''
public static void main(String[] args) throws Exception {
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://worker-1/");
System.out.println("started jdbc connection...");
// Create database tables
Statement stmt = conn.createStatement();
// Create table based on PARTITIONED template with one backup
stmt.executeUpdate("drop table if EXISTS MX;");
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS MX (" +
" timeStamp LONG, metricName VARCHAR, clusterId VARCHAR, hostName VARCHAR, metricValue FLOAT," +
" PRIMARY KEY (timeStamp, metricName, clusterId, hostName)) " +
" WITH \"backups=1, CACHE_NAME=MX, DATA_REGION=MX_24GB_Region\"");
stmt.executeUpdate("CREATE INDEX idx_timestamp_v1 ON MX (timeStamp)");
stmt.executeUpdate("CREATE INDEX idx_metric_name_v1 ON MX (metricName)");
stmt.executeUpdate("CREATE INDEX idx_cluster_id_v1 ON MX (clusterId)");
stmt.executeUpdate("CREATE INDEX idx_hostname_v1 ON MX (hostName)");
System.out.println("created table...");
Ignition.setClientMode(true);
IgniteConfiguration cfg = new IgniteConfiguration();
TcpDiscoveryVmIpFinder ipFinder1 = new TcpDiscoveryVmIpFinder();
ipFinder1.setAddresses(Arrays.asList("worker-1:47500..47502", "worker-2:47500..47502", "worker-3:47500..47502"));
TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
ipFinder.setAddresses(Arrays.asList("worker-1:47500..47502", "worker-2:47500..47502", "worker-3:47500..47502"));
discoverySpi.setIpFinder(ipFinder);
cfg.setDiscoverySpi(discoverySpi);
// cfg.setActiveOnStart(true);
Ignite ignite = Ignition.start(cfg);
System.out.println("start ignite...");
IgniteCache<String, MX> stmCache = ignite.getOrCreateCache("MX");
IgniteDataStreamer<String, MX> stmr = ignite.dataStreamer(stmCache.getName());
MX mx = new MX();
mx.setClusterId("MX");
mx.setMetricName("metric");
mx.setHostName("host-1");
mx.setMetricValue(80.0);
stmr.addData("MX", mx);
try (ResultSet rs =
stmt.executeQuery("SELECT AVG(metricValue) FROM MX")) {
System.out.println("Query result:");
while (rs.next())
System.out.println(">>> " + rs.getString(1));
}
}
'''
after putting data to the streamer, the result set is empty where execute query on that table.
class MX is as below:
public class MX implements Serializable {
@QuerySqlField(index = true)
private Long timesTamp;
@QuerySqlField(index = true)
private String metricName;
@QuerySqlField(index = true)
private String clusterId;
@QuerySqlField(index = true)
private String hostName;
@QuerySqlField
private Double metricValue;
public Long getTimesTamp() {
return timesTamp;
}
public void setTimesTamp(Long timesTamp) {
this.timesTamp = timesTamp;
}
public String getMetricName() {
return metricName;
}
public void setMetricName(String metricName) {
this.metricName = metricName;
}
public String getClusterId() {
return clusterId;
}
public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public Double getMetricValue() {
return metricValue;
}
public void setMetricValue(Double metricValue) {
this.metricValue = metricValue;
}
}
can anyone give me some suggestions, thank you very much!