You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ab...@apache.org on 2014/02/26 16:48:04 UTC
[1/2] git commit: BUG-12536 - Hadoop cluster alerts are not being
communicated through SysLogger
Repository: ambari
Updated Branches:
refs/heads/branch-1.2.5 384e18672 -> defe744fc
refs/heads/trunk d8cfceb9d -> 682e55d04
BUG-12536 - Hadoop cluster alerts are not being communicated through SysLogger
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/defe744f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/defe744f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/defe744f
Branch: refs/heads/branch-1.2.5
Commit: defe744fccd7330eea9cd794beeef6c7564173da
Parents: 384e186
Author: Artem Baranchuk <ab...@hortonworks.com>
Authored: Wed Feb 12 01:28:20 2014 +0200
Committer: Artem Baranchuk <ab...@hortonworks.com>
Committed: Wed Feb 12 01:28:20 2014 +0200
----------------------------------------------------------------------
.../src/addOns/nagios/plugins/sys_logger.py | 76 +++++++++++++++-----
1 file changed, 60 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/defe744f/contrib/addons/src/addOns/nagios/plugins/sys_logger.py
----------------------------------------------------------------------
diff --git a/contrib/addons/src/addOns/nagios/plugins/sys_logger.py b/contrib/addons/src/addOns/nagios/plugins/sys_logger.py
index 7b716c5..251e313 100644
--- a/contrib/addons/src/addOns/nagios/plugins/sys_logger.py
+++ b/contrib/addons/src/addOns/nagios/plugins/sys_logger.py
@@ -30,27 +30,71 @@ degraded_alert_services = ['HBASEMASTER::HBase Master CPU utilization',
fatal_alert_services = ['NAMENODE::NameNode process down']
# dictionary of service->msg_id mappings
-msg_ids = {'Host::Ping':'host_down', 'HBASEMASTER::HBase Master CPU utilization':'master_cpu_utilization',
- 'HDFS::HDFS capacity utilization':'hdfs_percent_capacity', 'HDFS::Corrupt/Missing blocks':'hdfs_block',
- 'NAMENODE::NameNode edit logs directory status':'namenode_edit_log_write', 'HDFS::Percent DataNodes down':'datanode_down',
- 'DATANODE::DataNode process down':'datanode_process_down', 'HDFS::Percent DataNodes storage full':'datanodes_percent_storage_full',
- 'NAMENODE::NameNode process down':'namenode_process_down', 'HDFS::NameNode RPC latency':'namenode_rpc_latency',
- 'DATANODE::DataNode storage full':'datanodes_storage_full', 'JOBTRACKER::JobTracker process down':'jobtracker_process_down',
- 'MAPREDUCE::JobTracker RPC latency':'jobtracker_rpc_latency', 'MAPREDUCE::Percent TaskTrackers down':'tasktrackers_down',
- 'TASKTRACKER::TaskTracker process down':'tasktracker_process_down', 'HBASEMASTER::HBase Master process down':'hbasemaster_process_down',
- 'REGIONSERVER::RegionServer process down':'regionserver_process_down', 'HBASE::Percent RegionServers down':'regionservers_down',
- 'HIVE-METASTORE::Hive Metastore status check':'hive_metastore_process_down', 'ZOOKEEPER::Percent ZooKeeper Servers down':'zookeepers_down',
- 'ZOOKEEPER::ZooKeeper Server process down':'zookeeper_process_down', 'OOZIE::Oozie Server status check':'oozie_down',
- 'WEBHCAT::WebHCat Server status check':'templeton_down', 'PUPPET::Puppet agent down':'puppet_down',
- 'NAGIOS::Nagios status log staleness':'nagios_status_log_stale', 'GANGLIA::Ganglia [gmetad] process down':'ganglia_process_down',
+msg_ids = {'Host::Ping':'host_down',
+ 'HBASEMASTER::HBase Master CPU utilization':'master_cpu_utilization',
+ 'HDFS::HDFS capacity utilization':'hdfs_percent_capacity',
+ 'HDFS::Corrupt/Missing blocks':'hdfs_block',
+ 'NAMENODE::NameNode edit logs directory status':'namenode_edit_log_write',
+ 'HDFS::Percent DataNodes down':'datanode_down',
+ 'DATANODE::DataNode process down':'datanode_process_down',
+ 'HDFS::Percent DataNodes storage full':'datanodes_percent_storage_full',
+ 'NAMENODE::NameNode process down':'namenode_process_down',
+ 'HDFS::NameNode RPC latency':'namenode_rpc_latency',
+ 'DATANODE::DataNode storage full':'datanodes_storage_full',
+ 'JOBTRACKER::JobTracker process down':'jobtracker_process_down',
+ 'MAPREDUCE::JobTracker RPC latency':'jobtracker_rpc_latency',
+ 'MAPREDUCE::Percent TaskTrackers down':'tasktrackers_down',
+ 'TASKTRACKER::TaskTracker process down':'tasktracker_process_down',
+ 'HBASEMASTER::HBase Master process down':'hbasemaster_process_down',
+ 'REGIONSERVER::RegionServer process down':'regionserver_process_down',
+ 'HBASE::Percent RegionServers down':'regionservers_down',
+ 'HIVE-METASTORE::Hive Metastore status check':'hive_metastore_process_down',
+ 'ZOOKEEPER::Percent ZooKeeper Servers down':'zookeepers_down',
+ 'ZOOKEEPER::ZooKeeper Server process down':'zookeeper_process_down',
+ 'OOZIE::Oozie Server status check':'oozie_down',
+ 'WEBHCAT::WebHCat Server status check':'templeton_down',
+ 'PUPPET::Puppet agent down':'puppet_down',
+ 'NAGIOS::Nagios status log staleness':'nagios_status_log_stale',
+ 'GANGLIA::Ganglia [gmetad] process down':'ganglia_process_down',
'GANGLIA::Ganglia Collector [gmond] process down alert for HBase Master':'ganglia_collector_process_down',
'GANGLIA::Ganglia Collector [gmond] process down alert for JobTracker':'ganglia_collector_process_down',
'GANGLIA::Ganglia Collector [gmond] process down alert for NameNode':'ganglia_collector_process_down',
'GANGLIA::Ganglia Collector [gmond] process down alert for slaves':'ganglia_collector_process_down',
'NAMENODE::Secondary NameNode process down':'secondary_namenode_process_down',
'JOBTRACKER::JobTracker CPU utilization':'jobtracker_cpu_utilization',
- 'HBASEMASTER::HBase Master Web UI down':'hbase_ui_down', 'NAMENODE::NameNode Web UI down':'namenode_ui_down',
- 'JOBTRACKER::JobHistory Web UI down':'jobhistory_ui_down', 'JOBTRACKER::JobTracker Web UI down':'jobtracker_ui_down'}
+ 'HBASEMASTER::HBase Master Web UI down':'hbase_ui_down',
+ 'NAMENODE::NameNode Web UI down':'namenode_ui_down',
+ 'JOBTRACKER::JobHistory Web UI down':'jobhistory_ui_down',
+ 'JOBTRACKER::JobTracker Web UI down':'jobtracker_ui_down',
+
+
+ 'HBASEMASTER::HBaseMaster CPU utilization':'master_cpu_utilization',
+ 'HDFS::HDFS Capacity utilization':'hdfs_percent_capacity',
+ 'NAMENODE::Namenode Edit logs directory status':'namenode_edit_log_write',
+ 'DATANODE::Process down':'datanode_process_down',
+ 'NAMENODE::Namenode Process down':'namenode_process_down',
+ 'HDFS::Namenode RPC Latency':'namenode_rpc_latency',
+ 'DATANODE::Storage full':'datanodes_storage_full',
+ 'JOBTRACKER::Jobtracker Process down':'jobtracker_process_down',
+ 'MAPREDUCE::JobTracker RPC Latency':'jobtracker_rpc_latency',
+ 'TASKTRACKER::Process down':'tasktracker_process_down',
+ 'HBASEMASTER::HBaseMaster Process down':'hbasemaster_process_down',
+ 'REGIONSERVER::Process down':'regionserver_process_down',
+ 'HBASE::Percent region servers down':'regionservers_down',
+ 'HIVE-METASTORE::HIVE-METASTORE status check':'hive_metastore_process_down',
+ 'ZOOKEEPER::Percent zookeeper servers down':'zookeepers_down',
+ 'ZKSERVERS::ZKSERVERS Process down':'zookeeper_process_down',
+ 'Oozie status check':'oozie_down',
+ 'WEBHCAT::WebHcat status check':'templeton_down',
+ 'GANGLIA::Ganglia [gmetad] Process down':'ganglia_process_down',
+ 'GANGLIA::Ganglia collector [gmond] Process down alert for hbasemaster':'ganglia_collector_process_down',
+ 'GANGLIA::Ganglia collector [gmond] Process down alert for jobtracker':'ganglia_collector_process_down',
+ 'GANGLIA::Ganglia collector [gmond] Process down alert for namenode':'ganglia_collector_process_down',
+ 'GANGLIA::Ganglia collector [gmond] Process down alert for slaves':'ganglia_collector_process_down',
+ 'NAMENODE::Secondary Namenode Process down':'secondary_namenode_process_down',
+ 'JOBTRACKER::Jobtracker CPU utilization':'jobtracker_cpu_utilization',
+ 'HBASEMASTER::HBase Web UI down':'hbase_ui_down',
+ 'NAMENODE::Namenode Web UI down':'namenode_ui_down'}
# Determine the severity of the TVI alert based on the Nagios alert state.
@@ -87,7 +131,7 @@ def determine_domain():
# log the TVI msg to the syslog
def log_tvi_msg(msg):
- syslog.openlog('Hadoop', syslog.LOG_PID)
+ syslog.openlog('nagios', syslog.LOG_PID)
syslog.syslog(msg)
[2/2] git commit: AMBARI-4840 - Need 2 versions of metrics sink
Posted by ab...@apache.org.
AMBARI-4840 - Need 2 versions of metrics sink
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/682e55d0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/682e55d0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/682e55d0
Branch: refs/heads/trunk
Commit: 682e55d04a5a82f42178855fabaa27c573a8558f
Parents: d8cfceb
Author: Artem Baranchuk <ab...@hortonworks.com>
Authored: Wed Feb 26 16:18:41 2014 +0200
Committer: Artem Baranchuk <ab...@hortonworks.com>
Committed: Wed Feb 26 17:43:39 2014 +0200
----------------------------------------------------------------------
contrib/ambari-scom/metrics-sink/pom.xml | 69 +++++++++++++-
.../metrics2/sink/SqlServerSinkHadoop1Test.java | 97 ++++++++++++++++++++
.../metrics2/sink/SqlServerSinkHadoop2Test.java | 97 ++++++++++++++++++++
.../hadoop/metrics2/sink/SqlServerSinkTest.java | 83 +++--------------
.../hadoop/metrics2/sink/SqlServerSink.java | 43 ++++-----
.../metrics2/sink/SqlServerSinkHadoop1.java | 41 +++++++++
.../metrics2/sink/SqlServerSinkHadoop2.java | 40 ++++++++
7 files changed, 368 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/pom.xml b/contrib/ambari-scom/metrics-sink/pom.xml
index c005c2e..7c2706a 100644
--- a/contrib/ambari-scom/metrics-sink/pom.xml
+++ b/contrib/ambari-scom/metrics-sink/pom.xml
@@ -44,13 +44,72 @@
<version>3.1</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>[1.0,2.0)</version>
- </dependency>
</dependencies>
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ <property>
+ <name>hadoop1</name>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>[1.0,2.0)</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/SqlServerSinkHadoop2.java</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/SqlServerSinkHadoop2Test.java</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/SqlServerSinkHadoop1.java</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/SqlServerSinkHadoop1Test.java</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
<build>
<pluginManagement>
<plugins>
http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1Test.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1Test.java b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1Test.java
new file mode 100644
index 0000000..4e9c44a
--- /dev/null
+++ b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1Test.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink;
+
+import com.microsoft.sqlserver.jdbc.SQLServerDriver;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.junit.Test;
+
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.easymock.EasyMock.*;
+
+public class SqlServerSinkHadoop1Test extends SqlServerSinkTest<SqlServerSinkHadoop1> {
+ @Override
+ public SqlServerSinkHadoop1 createInstance() throws InstantiationException, IllegalAccessException {
+ return new SqlServerSinkHadoop1();
+ }
+
+ @Override
+ @Test
+ public void testPutMetrics() throws Exception {
+ SubsetConfiguration configuration = createNiceMock(SubsetConfiguration.class);
+ Connection connection = createNiceMock(Connection.class);
+ CallableStatement cstmt = createNiceMock(CallableStatement.class);
+ MetricsRecord record = createNiceMock(MetricsRecord.class);
+ Metric metric = createNiceMock(Metric.class);
+
+ // set expectations
+ expect(configuration.getParent()).andReturn(null);
+ expect(configuration.getPrefix()).andReturn("prefix");
+ expect(configuration.getString("databaseUrl")).andReturn("url");
+
+ expect(record.context()).andReturn("context");
+ expect(record.name()).andReturn("typeName");
+ expect(record.tags()).andReturn(new HashSet<MetricsTag>());
+ expect(record.timestamp()).andReturn(9999L);
+
+ expect(record.metrics()).andReturn(Collections.singleton(metric));
+
+ expect(metric.name()).andReturn("name").anyTimes();
+ expect(metric.value()).andReturn(1234);
+
+ expect(connection.prepareCall("{call dbo.uspGetMetricRecord(?, ?, ?, ?, ?, ?, ?, ?, ?)}")).andReturn(cstmt);
+ cstmt.setNString(1, "context");
+ cstmt.setNString(2, "typeName");
+ cstmt.setNString(eq(3), (String) anyObject());
+ cstmt.setNString(eq(4), (String) anyObject());
+ cstmt.setNString(eq(5), (String) anyObject());
+ cstmt.setNString(6, "prefix");
+ cstmt.setNString(7, "sourceName:prefix");
+ cstmt.setLong(8, 9999L);
+ cstmt.registerOutParameter(9, java.sql.Types.BIGINT);
+ expect(cstmt.execute()).andReturn(true);
+ expect(cstmt.getLong(9)).andReturn(99L);
+ expect(cstmt.wasNull()).andReturn(false);
+
+ expect(connection.prepareCall("{call dbo.uspInsertMetricValue(?, ?, ?)}")).andReturn(cstmt);
+ cstmt.setLong(1, 99L);
+ cstmt.setNString(2, "name");
+ cstmt.setNString(3, "1234");
+ expect(cstmt.execute()).andReturn(true);
+
+ // replay
+ replay(configuration, connection, cstmt, record, metric);
+
+ SqlServerSink sink = createInstance();
+
+ sink.init(configuration);
+
+ SQLServerDriver.setConnection(connection);
+
+ sink.putMetrics(record);
+
+ verify(configuration, connection, cstmt, record, metric);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2Test.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2Test.java b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2Test.java
new file mode 100644
index 0000000..dd94fd1
--- /dev/null
+++ b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2Test.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink;
+
+import com.microsoft.sqlserver.jdbc.SQLServerDriver;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.junit.Test;
+
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.easymock.EasyMock.*;
+
+public class SqlServerSinkHadoop2Test extends SqlServerSinkTest<SqlServerSinkHadoop2> {
+ @Override
+ public SqlServerSinkHadoop2 createInstance() throws InstantiationException, IllegalAccessException {
+ return new SqlServerSinkHadoop2();
+ }
+
+ @Override
+ @Test
+ public void testPutMetrics() throws Exception {
+ SubsetConfiguration configuration = createNiceMock(SubsetConfiguration.class);
+ Connection connection = createNiceMock(Connection.class);
+ CallableStatement cstmt = createNiceMock(CallableStatement.class);
+ MetricsRecord record = createNiceMock(MetricsRecord.class);
+ AbstractMetric metric = createNiceMock(AbstractMetric.class);
+
+ // set expectations
+ expect(configuration.getParent()).andReturn(null);
+ expect(configuration.getPrefix()).andReturn("prefix");
+ expect(configuration.getString("databaseUrl")).andReturn("url");
+
+ expect(record.context()).andReturn("context");
+ expect(record.name()).andReturn("typeName");
+ expect(record.tags()).andReturn(new HashSet<MetricsTag>());
+ expect(record.timestamp()).andReturn(9999L);
+
+ expect(record.metrics()).andReturn(Collections.singleton(metric));
+
+ expect(metric.name()).andReturn("name").anyTimes();
+ expect(metric.value()).andReturn(1234);
+
+ expect(connection.prepareCall("{call dbo.uspGetMetricRecord(?, ?, ?, ?, ?, ?, ?, ?, ?)}")).andReturn(cstmt);
+ cstmt.setNString(1, "context");
+ cstmt.setNString(2, "typeName");
+ cstmt.setNString(eq(3), (String) anyObject());
+ cstmt.setNString(eq(4), (String) anyObject());
+ cstmt.setNString(eq(5), (String) anyObject());
+ cstmt.setNString(6, "prefix");
+ cstmt.setNString(7, "sourceName:prefix");
+ cstmt.setLong(8, 9999L);
+ cstmt.registerOutParameter(9, java.sql.Types.BIGINT);
+ expect(cstmt.execute()).andReturn(true);
+ expect(cstmt.getLong(9)).andReturn(99L);
+ expect(cstmt.wasNull()).andReturn(false);
+
+ expect(connection.prepareCall("{call dbo.uspInsertMetricValue(?, ?, ?)}")).andReturn(cstmt);
+ cstmt.setLong(1, 99L);
+ cstmt.setNString(2, "name");
+ cstmt.setNString(3, "1234");
+ expect(cstmt.execute()).andReturn(true);
+
+ // replay
+ replay(configuration, connection, cstmt, record, metric);
+
+ SqlServerSink sink = createInstance();
+
+ sink.init(configuration);
+
+ SQLServerDriver.setConnection(connection);
+
+ sink.putMetrics(record);
+
+ verify(configuration, connection, cstmt, record, metric);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkTest.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkTest.java b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkTest.java
index 4d4f6dd..f50dd6e 100644
--- a/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkTest.java
+++ b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkTest.java
@@ -20,20 +20,13 @@ package org.apache.hadoop.metrics2.sink;
import com.microsoft.sqlserver.jdbc.SQLServerDriver;
import org.apache.commons.configuration.SubsetConfiguration;
-import org.apache.hadoop.metrics2.Metric;
-import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.MetricsTag;
import org.junit.Assert;
import org.junit.Test;
import java.sql.CallableStatement;
import java.sql.Connection;
-import java.util.Collections;
-import java.util.HashSet;
-import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
@@ -41,7 +34,10 @@ import static org.easymock.EasyMock.verify;
/**
* SqlServerSink Tests.
*/
-public class SqlServerSinkTest {
+public abstract class SqlServerSinkTest<T extends SqlServerSink> {
+
+ public abstract T createInstance() throws InstantiationException, IllegalAccessException;
+
@Test
public void testInit() throws Exception {
@@ -55,7 +51,7 @@ public class SqlServerSinkTest {
// replay
replay(configuration);
- SqlServerSink sink = new SqlServerSink();
+ SqlServerSink sink = createInstance();
sink.init(configuration);
@@ -76,11 +72,11 @@ public class SqlServerSinkTest {
// replay
replay(configuration, connection);
- SqlServerSink sink = new SqlServerSink();
+ SqlServerSink sink = createInstance();
sink.init(configuration);
- Assert.assertFalse(sink.ensureConnection());
+ Assert.assertTrue(sink.ensureConnection());
SQLServerDriver.setConnection(connection);
Assert.assertTrue(sink.ensureConnection());
@@ -103,7 +99,7 @@ public class SqlServerSinkTest {
// replay
replay(configuration, connection);
- SqlServerSink sink = new SqlServerSink();
+ SqlServerSink sink = createInstance();
sink.init(configuration);
@@ -142,7 +138,7 @@ public class SqlServerSinkTest {
// replay
replay(configuration, connection, cstmt);
- SqlServerSink sink = new SqlServerSink();
+ SqlServerSink sink = createInstance();
sink.init(configuration);
@@ -182,7 +178,7 @@ public class SqlServerSinkTest {
// replay
replay(configuration, connection, cstmt);
- SqlServerSink sink = new SqlServerSink();
+ SqlServerSink sink = createInstance();
sink.init(configuration);
@@ -214,7 +210,7 @@ public class SqlServerSinkTest {
// replay
replay(configuration, connection, cstmt);
- SqlServerSink sink = new SqlServerSink();
+ SqlServerSink sink = createInstance();
sink.init(configuration);
@@ -225,60 +221,5 @@ public class SqlServerSinkTest {
verify(configuration, connection, cstmt);
}
- @Test
- public void testPutMetrics() throws Exception {
- SubsetConfiguration configuration = createNiceMock(SubsetConfiguration.class);
- Connection connection = createNiceMock(Connection.class);
- CallableStatement cstmt = createNiceMock(CallableStatement.class);
- MetricsRecord record = createNiceMock(MetricsRecord.class);
- Metric metric = createNiceMock(Metric.class);
-
- // set expectations
- expect(configuration.getParent()).andReturn(null);
- expect(configuration.getPrefix()).andReturn("prefix");
- expect(configuration.getString("databaseUrl")).andReturn("url");
-
- expect(record.context()).andReturn("context");
- expect(record.name()).andReturn("typeName");
- expect(record.tags()).andReturn(new HashSet<MetricsTag>());
- expect(record.timestamp()).andReturn(9999L);
-
- expect(record.metrics()).andReturn(Collections.singleton(metric));
-
- expect(metric.name()).andReturn("name").anyTimes();
- expect(metric.value()).andReturn(1234);
-
- expect(connection.prepareCall("{call dbo.uspGetMetricRecord(?, ?, ?, ?, ?, ?, ?, ?, ?)}")).andReturn(cstmt);
- cstmt.setNString(1, "context");
- cstmt.setNString(2, "typeName");
- cstmt.setNString(eq(3), (String) anyObject());
- cstmt.setNString(eq(4), (String) anyObject());
- cstmt.setNString(eq(5), (String) anyObject());
- cstmt.setNString(6, "prefix");
- cstmt.setNString(7, "sourceName:prefix");
- cstmt.setLong(8, 9999L);
- cstmt.registerOutParameter(9, java.sql.Types.BIGINT);
- expect(cstmt.execute()).andReturn(true);
- expect(cstmt.getLong(9)).andReturn(99L);
- expect(cstmt.wasNull()).andReturn(false);
-
- expect(connection.prepareCall("{call dbo.uspInsertMetricValue(?, ?, ?)}")).andReturn(cstmt);
- cstmt.setLong(1, 99L);
- cstmt.setNString(2, "name");
- cstmt.setNString(3, "1234");
- expect(cstmt.execute()).andReturn(true);
-
- // replay
- replay(configuration, connection, cstmt, record, metric);
-
- SqlServerSink sink = new SqlServerSink();
-
- sink.init(configuration);
-
- SQLServerDriver.setConnection(connection);
-
- sink.putMetrics(record);
-
- verify(configuration, connection, cstmt, record, metric);
- }
+ public abstract void testPutMetrics() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
index 034876a..3e5b70c 100644
--- a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
+++ b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
@@ -18,7 +18,13 @@
package org.apache.hadoop.metrics2.sink;
-import java.lang.Exception;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.log4j.Logger;
+
import java.net.InetAddress;
import java.sql.CallableStatement;
import java.sql.Connection;
@@ -27,18 +33,10 @@ import java.sql.SQLException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.commons.configuration.SubsetConfiguration;
-import org.apache.hadoop.metrics2.Metric;
-import org.apache.hadoop.metrics2.MetricsException;
-import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.MetricsSink;
-import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.log4j.Logger;
-
/**
* This class stores published metrics to the SQL Server database.
*/
-public class SqlServerSink implements MetricsSink {
+public abstract class SqlServerSink implements MetricsSink {
private static final String DATABASE_URL_KEY = "databaseUrl";
private static final boolean DEBUG = true;
private static final String NAME_URL_KEY = "fs.default.name";
@@ -109,22 +107,7 @@ public class SqlServerSink implements MetricsSink {
}
@Override
- public void putMetrics(MetricsRecord record) {
- long metricRecordID = getMetricRecordID(record.context(), record.name(),
- getLocalNodeName(), getLocalNodeIPAddress(), getClusterNodeName(), currentServiceName,
- getTagString(record.tags()), record.timestamp());
- if (metricRecordID < 0)
- return;
-
- for (Metric metric : record.metrics()) {
- insertMetricValue(metricRecordID, metric.name(), String.valueOf(metric
- .value()));
- if (metric.name().equals("BlockCapacity")) {
- insertMetricValue(metricRecordID, "BlockSize", Integer
- .toString(blockSize));
- }
- }
- }
+ public abstract void putMetrics(MetricsRecord record);
@Override
public void flush() {
@@ -311,4 +294,12 @@ public class SqlServerSink implements MetricsSink {
}
}
}
+
+ public String getCurrentServiceName() {
+ return currentServiceName;
+ }
+
+ public int getBlockSize() {
+ return blockSize;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java
new file mode 100644
index 0000000..852fb81
--- /dev/null
+++ b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink;
+
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+
+public class SqlServerSinkHadoop1 extends SqlServerSink {
+ @Override
+ public void putMetrics(MetricsRecord record) {
+ long metricRecordID = getMetricRecordID(record.context(), record.name(),
+ getLocalNodeName(), getLocalNodeIPAddress(), getClusterNodeName(), getCurrentServiceName(),
+ getTagString(record.tags()), record.timestamp());
+ if (metricRecordID < 0)
+ return;
+
+ for (Metric metric : record.metrics()) {
+ insertMetricValue(metricRecordID, metric.name(), String.valueOf(metric
+ .value()));
+ if (metric.name().equals("BlockCapacity")) {
+ insertMetricValue(metricRecordID, "BlockSize", Integer
+ .toString(getBlockSize()));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java
new file mode 100644
index 0000000..eae76a8
--- /dev/null
+++ b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+
+public class SqlServerSinkHadoop2 extends SqlServerSink {
+ @Override
+ public void putMetrics(MetricsRecord record) {
+ long metricRecordID = getMetricRecordID(record.context(), record.name(),
+ getLocalNodeName(), getLocalNodeIPAddress(), getClusterNodeName(), getCurrentServiceName(),
+ getTagString(record.tags()), record.timestamp());
+ if (metricRecordID < 0)
+ return;
+
+ for (AbstractMetric metric : record.metrics()) {
+ insertMetricValue(metricRecordID, metric.name(), String.valueOf(metric.value()));
+ if (metric.name().equals("BlockCapacity")) {
+ insertMetricValue(metricRecordID, "BlockSize", Integer
+ .toString(getBlockSize()));
+ }
+ }
+ }
+}