You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ab...@apache.org on 2014/02/26 16:48:04 UTC

[1/2] git commit: BUG-12536 - Hadoop cluster alerts are not being communicated through SysLogger

Repository: ambari
Updated Branches:
  refs/heads/branch-1.2.5 384e18672 -> defe744fc
  refs/heads/trunk d8cfceb9d -> 682e55d04


BUG-12536 - Hadoop cluster alerts are not being communicated through SysLogger


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/defe744f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/defe744f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/defe744f

Branch: refs/heads/branch-1.2.5
Commit: defe744fccd7330eea9cd794beeef6c7564173da
Parents: 384e186
Author: Artem Baranchuk <ab...@hortonworks.com>
Authored: Wed Feb 12 01:28:20 2014 +0200
Committer: Artem Baranchuk <ab...@hortonworks.com>
Committed: Wed Feb 12 01:28:20 2014 +0200

----------------------------------------------------------------------
 .../src/addOns/nagios/plugins/sys_logger.py     | 76 +++++++++++++++-----
 1 file changed, 60 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/defe744f/contrib/addons/src/addOns/nagios/plugins/sys_logger.py
----------------------------------------------------------------------
diff --git a/contrib/addons/src/addOns/nagios/plugins/sys_logger.py b/contrib/addons/src/addOns/nagios/plugins/sys_logger.py
index 7b716c5..251e313 100644
--- a/contrib/addons/src/addOns/nagios/plugins/sys_logger.py
+++ b/contrib/addons/src/addOns/nagios/plugins/sys_logger.py
@@ -30,27 +30,71 @@ degraded_alert_services = ['HBASEMASTER::HBase Master CPU utilization',
 fatal_alert_services = ['NAMENODE::NameNode process down']
 
 # dictionary of service->msg_id mappings
-msg_ids = {'Host::Ping':'host_down', 'HBASEMASTER::HBase Master CPU utilization':'master_cpu_utilization',
-           'HDFS::HDFS capacity utilization':'hdfs_percent_capacity', 'HDFS::Corrupt/Missing blocks':'hdfs_block',
-           'NAMENODE::NameNode edit logs directory status':'namenode_edit_log_write', 'HDFS::Percent DataNodes down':'datanode_down',
-           'DATANODE::DataNode process down':'datanode_process_down', 'HDFS::Percent DataNodes storage full':'datanodes_percent_storage_full',
-           'NAMENODE::NameNode process down':'namenode_process_down', 'HDFS::NameNode RPC latency':'namenode_rpc_latency',
-           'DATANODE::DataNode storage full':'datanodes_storage_full', 'JOBTRACKER::JobTracker process down':'jobtracker_process_down',
-           'MAPREDUCE::JobTracker RPC latency':'jobtracker_rpc_latency', 'MAPREDUCE::Percent TaskTrackers down':'tasktrackers_down',
-           'TASKTRACKER::TaskTracker process down':'tasktracker_process_down', 'HBASEMASTER::HBase Master process down':'hbasemaster_process_down',
-           'REGIONSERVER::RegionServer process down':'regionserver_process_down', 'HBASE::Percent RegionServers down':'regionservers_down',
-           'HIVE-METASTORE::Hive Metastore status check':'hive_metastore_process_down', 'ZOOKEEPER::Percent ZooKeeper Servers down':'zookeepers_down',
-           'ZOOKEEPER::ZooKeeper Server process down':'zookeeper_process_down', 'OOZIE::Oozie Server status check':'oozie_down',
-           'WEBHCAT::WebHCat Server status check':'templeton_down', 'PUPPET::Puppet agent down':'puppet_down',
-           'NAGIOS::Nagios status log staleness':'nagios_status_log_stale', 'GANGLIA::Ganglia [gmetad] process down':'ganglia_process_down',
+msg_ids = {'Host::Ping':'host_down',
+           'HBASEMASTER::HBase Master CPU utilization':'master_cpu_utilization',
+           'HDFS::HDFS capacity utilization':'hdfs_percent_capacity',
+           'HDFS::Corrupt/Missing blocks':'hdfs_block',
+           'NAMENODE::NameNode edit logs directory status':'namenode_edit_log_write',
+           'HDFS::Percent DataNodes down':'datanode_down',
+           'DATANODE::DataNode process down':'datanode_process_down',
+           'HDFS::Percent DataNodes storage full':'datanodes_percent_storage_full',
+           'NAMENODE::NameNode process down':'namenode_process_down',
+           'HDFS::NameNode RPC latency':'namenode_rpc_latency',
+           'DATANODE::DataNode storage full':'datanodes_storage_full',
+           'JOBTRACKER::JobTracker process down':'jobtracker_process_down',
+           'MAPREDUCE::JobTracker RPC latency':'jobtracker_rpc_latency',
+           'MAPREDUCE::Percent TaskTrackers down':'tasktrackers_down',
+           'TASKTRACKER::TaskTracker process down':'tasktracker_process_down',
+           'HBASEMASTER::HBase Master process down':'hbasemaster_process_down',
+           'REGIONSERVER::RegionServer process down':'regionserver_process_down',
+           'HBASE::Percent RegionServers down':'regionservers_down',
+           'HIVE-METASTORE::Hive Metastore status check':'hive_metastore_process_down',
+           'ZOOKEEPER::Percent ZooKeeper Servers down':'zookeepers_down',
+           'ZOOKEEPER::ZooKeeper Server process down':'zookeeper_process_down',
+           'OOZIE::Oozie Server status check':'oozie_down',
+           'WEBHCAT::WebHCat Server status check':'templeton_down',
+           'PUPPET::Puppet agent down':'puppet_down',
+           'NAGIOS::Nagios status log staleness':'nagios_status_log_stale',
+           'GANGLIA::Ganglia [gmetad] process down':'ganglia_process_down',
            'GANGLIA::Ganglia Collector [gmond] process down alert for HBase Master':'ganglia_collector_process_down',
            'GANGLIA::Ganglia Collector [gmond] process down alert for JobTracker':'ganglia_collector_process_down',
            'GANGLIA::Ganglia Collector [gmond] process down alert for NameNode':'ganglia_collector_process_down',
            'GANGLIA::Ganglia Collector [gmond] process down alert for slaves':'ganglia_collector_process_down',
            'NAMENODE::Secondary NameNode process down':'secondary_namenode_process_down',
            'JOBTRACKER::JobTracker CPU utilization':'jobtracker_cpu_utilization',
-           'HBASEMASTER::HBase Master Web UI down':'hbase_ui_down', 'NAMENODE::NameNode Web UI down':'namenode_ui_down',
-           'JOBTRACKER::JobHistory Web UI down':'jobhistory_ui_down', 'JOBTRACKER::JobTracker Web UI down':'jobtracker_ui_down'}
+           'HBASEMASTER::HBase Master Web UI down':'hbase_ui_down',
+           'NAMENODE::NameNode Web UI down':'namenode_ui_down',
+           'JOBTRACKER::JobHistory Web UI down':'jobhistory_ui_down',
+           'JOBTRACKER::JobTracker Web UI down':'jobtracker_ui_down',
+
+
+           'HBASEMASTER::HBaseMaster CPU utilization':'master_cpu_utilization',
+           'HDFS::HDFS Capacity utilization':'hdfs_percent_capacity',
+           'NAMENODE::Namenode Edit logs directory status':'namenode_edit_log_write',
+           'DATANODE::Process down':'datanode_process_down',
+           'NAMENODE::Namenode Process down':'namenode_process_down',
+           'HDFS::Namenode RPC Latency':'namenode_rpc_latency',
+           'DATANODE::Storage full':'datanodes_storage_full',
+           'JOBTRACKER::Jobtracker Process down':'jobtracker_process_down',
+           'MAPREDUCE::JobTracker RPC Latency':'jobtracker_rpc_latency',
+           'TASKTRACKER::Process down':'tasktracker_process_down',
+           'HBASEMASTER::HBaseMaster Process down':'hbasemaster_process_down',
+           'REGIONSERVER::Process down':'regionserver_process_down',
+           'HBASE::Percent region servers down':'regionservers_down',
+           'HIVE-METASTORE::HIVE-METASTORE status check':'hive_metastore_process_down',
+           'ZOOKEEPER::Percent zookeeper servers down':'zookeepers_down',
+           'ZKSERVERS::ZKSERVERS Process down':'zookeeper_process_down',
+           'Oozie status check':'oozie_down',
+           'WEBHCAT::WebHcat status check':'templeton_down',
+           'GANGLIA::Ganglia [gmetad] Process down':'ganglia_process_down',
+           'GANGLIA::Ganglia collector [gmond] Process down alert for hbasemaster':'ganglia_collector_process_down',
+           'GANGLIA::Ganglia collector [gmond] Process down alert for jobtracker':'ganglia_collector_process_down',
+           'GANGLIA::Ganglia collector [gmond] Process down alert for namenode':'ganglia_collector_process_down',
+           'GANGLIA::Ganglia collector [gmond] Process down alert for slaves':'ganglia_collector_process_down',
+           'NAMENODE::Secondary Namenode Process down':'secondary_namenode_process_down',
+           'JOBTRACKER::Jobtracker CPU utilization':'jobtracker_cpu_utilization',
+           'HBASEMASTER::HBase Web UI down':'hbase_ui_down',
+           'NAMENODE::Namenode Web UI down':'namenode_ui_down'}
 
 
 # Determine the severity of the TVI alert based on the Nagios alert state.
@@ -87,7 +131,7 @@ def determine_domain():
 
 # log the TVI msg to the syslog
 def log_tvi_msg(msg):
-    syslog.openlog('Hadoop', syslog.LOG_PID)
+    syslog.openlog('nagios', syslog.LOG_PID)
     syslog.syslog(msg)
 
 


[2/2] git commit: AMBARI-4840 - Need 2 versions of metrics sink

Posted by ab...@apache.org.
AMBARI-4840 - Need 2 versions of metrics sink


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/682e55d0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/682e55d0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/682e55d0

Branch: refs/heads/trunk
Commit: 682e55d04a5a82f42178855fabaa27c573a8558f
Parents: d8cfceb
Author: Artem Baranchuk <ab...@hortonworks.com>
Authored: Wed Feb 26 16:18:41 2014 +0200
Committer: Artem Baranchuk <ab...@hortonworks.com>
Committed: Wed Feb 26 17:43:39 2014 +0200

----------------------------------------------------------------------
 contrib/ambari-scom/metrics-sink/pom.xml        | 69 +++++++++++++-
 .../metrics2/sink/SqlServerSinkHadoop1Test.java | 97 ++++++++++++++++++++
 .../metrics2/sink/SqlServerSinkHadoop2Test.java | 97 ++++++++++++++++++++
 .../hadoop/metrics2/sink/SqlServerSinkTest.java | 83 +++--------------
 .../hadoop/metrics2/sink/SqlServerSink.java     | 43 ++++-----
 .../metrics2/sink/SqlServerSinkHadoop1.java     | 41 +++++++++
 .../metrics2/sink/SqlServerSinkHadoop2.java     | 40 ++++++++
 7 files changed, 368 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/pom.xml b/contrib/ambari-scom/metrics-sink/pom.xml
index c005c2e..7c2706a 100644
--- a/contrib/ambari-scom/metrics-sink/pom.xml
+++ b/contrib/ambari-scom/metrics-sink/pom.xml
@@ -44,13 +44,72 @@
             <version>3.1</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-core</artifactId>
-            <version>[1.0,2.0)</version>
-        </dependency>
     </dependencies>
 
+    <profiles>
+        <profile>
+            <id>hadoop1</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+                <property>
+                    <name>hadoop1</name>
+                </property>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-core</artifactId>
+                    <version>[1.0,2.0)</version>
+                </dependency>
+            </dependencies>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <configuration>
+                            <excludes>
+                                <exclude>**/SqlServerSinkHadoop2.java</exclude>
+                            </excludes>
+                            <testExcludes>
+                                <exclude>**/SqlServerSinkHadoop2Test.java</exclude>
+                            </testExcludes>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>hadoop2</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                    <version>2.2.0</version>
+                </dependency>
+            </dependencies>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <configuration>
+                            <excludes>
+                                <exclude>**/SqlServerSinkHadoop1.java</exclude>
+                            </excludes>
+                            <testExcludes>
+                                <exclude>**/SqlServerSinkHadoop1Test.java</exclude>
+                            </testExcludes>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
     <build>
         <pluginManagement>
             <plugins>

http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1Test.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1Test.java b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1Test.java
new file mode 100644
index 0000000..4e9c44a
--- /dev/null
+++ b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1Test.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink;
+
+import com.microsoft.sqlserver.jdbc.SQLServerDriver;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.junit.Test;
+
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.easymock.EasyMock.*;
+
+public class SqlServerSinkHadoop1Test extends SqlServerSinkTest<SqlServerSinkHadoop1> {
+    @Override
+    public SqlServerSinkHadoop1 createInstance() throws InstantiationException, IllegalAccessException {
+        return new SqlServerSinkHadoop1();
+    }
+
+    @Override
+    @Test
+    public void testPutMetrics() throws Exception {
+        SubsetConfiguration configuration = createNiceMock(SubsetConfiguration.class);
+        Connection connection = createNiceMock(Connection.class);
+        CallableStatement cstmt = createNiceMock(CallableStatement.class);
+        MetricsRecord record = createNiceMock(MetricsRecord.class);
+        Metric metric = createNiceMock(Metric.class);
+
+        // set expectations
+        expect(configuration.getParent()).andReturn(null);
+        expect(configuration.getPrefix()).andReturn("prefix");
+        expect(configuration.getString("databaseUrl")).andReturn("url");
+
+        expect(record.context()).andReturn("context");
+        expect(record.name()).andReturn("typeName");
+        expect(record.tags()).andReturn(new HashSet<MetricsTag>());
+        expect(record.timestamp()).andReturn(9999L);
+
+        expect(record.metrics()).andReturn(Collections.singleton(metric));
+
+        expect(metric.name()).andReturn("name").anyTimes();
+        expect(metric.value()).andReturn(1234);
+
+        expect(connection.prepareCall("{call dbo.uspGetMetricRecord(?, ?, ?, ?, ?, ?, ?, ?, ?)}")).andReturn(cstmt);
+        cstmt.setNString(1, "context");
+        cstmt.setNString(2, "typeName");
+        cstmt.setNString(eq(3), (String) anyObject());
+        cstmt.setNString(eq(4), (String) anyObject());
+        cstmt.setNString(eq(5), (String) anyObject());
+        cstmt.setNString(6, "prefix");
+        cstmt.setNString(7, "sourceName:prefix");
+        cstmt.setLong(8, 9999L);
+        cstmt.registerOutParameter(9, java.sql.Types.BIGINT);
+        expect(cstmt.execute()).andReturn(true);
+        expect(cstmt.getLong(9)).andReturn(99L);
+        expect(cstmt.wasNull()).andReturn(false);
+
+        expect(connection.prepareCall("{call dbo.uspInsertMetricValue(?, ?, ?)}")).andReturn(cstmt);
+        cstmt.setLong(1, 99L);
+        cstmt.setNString(2, "name");
+        cstmt.setNString(3, "1234");
+        expect(cstmt.execute()).andReturn(true);
+
+        // replay
+        replay(configuration, connection, cstmt, record, metric);
+
+        SqlServerSink sink = createInstance();
+
+        sink.init(configuration);
+
+        SQLServerDriver.setConnection(connection);
+
+        sink.putMetrics(record);
+
+        verify(configuration, connection, cstmt, record, metric);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2Test.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2Test.java b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2Test.java
new file mode 100644
index 0000000..dd94fd1
--- /dev/null
+++ b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2Test.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink;
+
+import com.microsoft.sqlserver.jdbc.SQLServerDriver;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.junit.Test;
+
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.easymock.EasyMock.*;
+
+public class SqlServerSinkHadoop2Test extends SqlServerSinkTest<SqlServerSinkHadoop2> {
+    @Override
+    public SqlServerSinkHadoop2 createInstance() throws InstantiationException, IllegalAccessException {
+        return new SqlServerSinkHadoop2();
+    }
+
+    @Override
+    @Test
+    public void testPutMetrics() throws Exception {
+        SubsetConfiguration configuration = createNiceMock(SubsetConfiguration.class);
+        Connection connection = createNiceMock(Connection.class);
+        CallableStatement cstmt = createNiceMock(CallableStatement.class);
+        MetricsRecord record = createNiceMock(MetricsRecord.class);
+        AbstractMetric metric = createNiceMock(AbstractMetric.class);
+
+        // set expectations
+        expect(configuration.getParent()).andReturn(null);
+        expect(configuration.getPrefix()).andReturn("prefix");
+        expect(configuration.getString("databaseUrl")).andReturn("url");
+
+        expect(record.context()).andReturn("context");
+        expect(record.name()).andReturn("typeName");
+        expect(record.tags()).andReturn(new HashSet<MetricsTag>());
+        expect(record.timestamp()).andReturn(9999L);
+
+        expect(record.metrics()).andReturn(Collections.singleton(metric));
+
+        expect(metric.name()).andReturn("name").anyTimes();
+        expect(metric.value()).andReturn(1234);
+
+        expect(connection.prepareCall("{call dbo.uspGetMetricRecord(?, ?, ?, ?, ?, ?, ?, ?, ?)}")).andReturn(cstmt);
+        cstmt.setNString(1, "context");
+        cstmt.setNString(2, "typeName");
+        cstmt.setNString(eq(3), (String) anyObject());
+        cstmt.setNString(eq(4), (String) anyObject());
+        cstmt.setNString(eq(5), (String) anyObject());
+        cstmt.setNString(6, "prefix");
+        cstmt.setNString(7, "sourceName:prefix");
+        cstmt.setLong(8, 9999L);
+        cstmt.registerOutParameter(9, java.sql.Types.BIGINT);
+        expect(cstmt.execute()).andReturn(true);
+        expect(cstmt.getLong(9)).andReturn(99L);
+        expect(cstmt.wasNull()).andReturn(false);
+
+        expect(connection.prepareCall("{call dbo.uspInsertMetricValue(?, ?, ?)}")).andReturn(cstmt);
+        cstmt.setLong(1, 99L);
+        cstmt.setNString(2, "name");
+        cstmt.setNString(3, "1234");
+        expect(cstmt.execute()).andReturn(true);
+
+        // replay
+        replay(configuration, connection, cstmt, record, metric);
+
+        SqlServerSink sink = createInstance();
+
+        sink.init(configuration);
+
+        SQLServerDriver.setConnection(connection);
+
+        sink.putMetrics(record);
+
+        verify(configuration, connection, cstmt, record, metric);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkTest.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkTest.java b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkTest.java
index 4d4f6dd..f50dd6e 100644
--- a/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkTest.java
+++ b/contrib/ambari-scom/metrics-sink/src/Test/java/org/apache/hadoop/metrics2/sink/SqlServerSinkTest.java
@@ -20,20 +20,13 @@ package org.apache.hadoop.metrics2.sink;
 
 import com.microsoft.sqlserver.jdbc.SQLServerDriver;
 import org.apache.commons.configuration.SubsetConfiguration;
-import org.apache.hadoop.metrics2.Metric;
-import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.MetricsTag;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.sql.CallableStatement;
 import java.sql.Connection;
-import java.util.Collections;
-import java.util.HashSet;
 
-import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
@@ -41,7 +34,10 @@ import static org.easymock.EasyMock.verify;
 /**
  * SqlServerSink Tests.
  */
-public class SqlServerSinkTest {
+public abstract class SqlServerSinkTest<T extends SqlServerSink> {
+
+  public abstract T createInstance() throws InstantiationException, IllegalAccessException;
+
   @Test
   public void testInit() throws Exception {
 
@@ -55,7 +51,7 @@ public class SqlServerSinkTest {
     // replay
     replay(configuration);
 
-    SqlServerSink sink = new SqlServerSink();
+    SqlServerSink sink = createInstance();
 
     sink.init(configuration);
 
@@ -76,11 +72,11 @@ public class SqlServerSinkTest {
     // replay
     replay(configuration, connection);
 
-    SqlServerSink sink = new SqlServerSink();
+    SqlServerSink sink = createInstance();
 
     sink.init(configuration);
 
-    Assert.assertFalse(sink.ensureConnection());
+    Assert.assertTrue(sink.ensureConnection());
 
     SQLServerDriver.setConnection(connection);
     Assert.assertTrue(sink.ensureConnection());
@@ -103,7 +99,7 @@ public class SqlServerSinkTest {
     // replay
     replay(configuration, connection);
 
-    SqlServerSink sink = new SqlServerSink();
+    SqlServerSink sink = createInstance();
 
     sink.init(configuration);
 
@@ -142,7 +138,7 @@ public class SqlServerSinkTest {
     // replay
     replay(configuration, connection, cstmt);
 
-    SqlServerSink sink = new SqlServerSink();
+    SqlServerSink sink = createInstance();
 
     sink.init(configuration);
 
@@ -182,7 +178,7 @@ public class SqlServerSinkTest {
     // replay
     replay(configuration, connection, cstmt);
 
-    SqlServerSink sink = new SqlServerSink();
+    SqlServerSink sink = createInstance();
 
     sink.init(configuration);
 
@@ -214,7 +210,7 @@ public class SqlServerSinkTest {
     // replay
     replay(configuration, connection, cstmt);
 
-    SqlServerSink sink = new SqlServerSink();
+    SqlServerSink sink = createInstance();
 
     sink.init(configuration);
 
@@ -225,60 +221,5 @@ public class SqlServerSinkTest {
     verify(configuration, connection, cstmt);
   }
 
-  @Test
-  public void testPutMetrics() throws Exception {
-    SubsetConfiguration configuration = createNiceMock(SubsetConfiguration.class);
-    Connection connection = createNiceMock(Connection.class);
-    CallableStatement cstmt = createNiceMock(CallableStatement.class);
-    MetricsRecord record = createNiceMock(MetricsRecord.class);
-    Metric metric = createNiceMock(Metric.class);
-
-    // set expectations
-    expect(configuration.getParent()).andReturn(null);
-    expect(configuration.getPrefix()).andReturn("prefix");
-    expect(configuration.getString("databaseUrl")).andReturn("url");
-
-    expect(record.context()).andReturn("context");
-    expect(record.name()).andReturn("typeName");
-    expect(record.tags()).andReturn(new HashSet<MetricsTag>());
-    expect(record.timestamp()).andReturn(9999L);
-
-    expect(record.metrics()).andReturn(Collections.singleton(metric));
-
-    expect(metric.name()).andReturn("name").anyTimes();
-    expect(metric.value()).andReturn(1234);
-
-    expect(connection.prepareCall("{call dbo.uspGetMetricRecord(?, ?, ?, ?, ?, ?, ?, ?, ?)}")).andReturn(cstmt);
-    cstmt.setNString(1, "context");
-    cstmt.setNString(2, "typeName");
-    cstmt.setNString(eq(3), (String) anyObject());
-    cstmt.setNString(eq(4), (String) anyObject());
-    cstmt.setNString(eq(5), (String) anyObject());
-    cstmt.setNString(6, "prefix");
-    cstmt.setNString(7, "sourceName:prefix");
-    cstmt.setLong(8, 9999L);
-    cstmt.registerOutParameter(9, java.sql.Types.BIGINT);
-    expect(cstmt.execute()).andReturn(true);
-    expect(cstmt.getLong(9)).andReturn(99L);
-    expect(cstmt.wasNull()).andReturn(false);
-
-    expect(connection.prepareCall("{call dbo.uspInsertMetricValue(?, ?, ?)}")).andReturn(cstmt);
-    cstmt.setLong(1, 99L);
-    cstmt.setNString(2, "name");
-    cstmt.setNString(3, "1234");
-    expect(cstmt.execute()).andReturn(true);
-
-    // replay
-    replay(configuration, connection, cstmt, record, metric);
-
-    SqlServerSink sink = new SqlServerSink();
-
-    sink.init(configuration);
-
-    SQLServerDriver.setConnection(connection);
-
-    sink.putMetrics(record);
-
-    verify(configuration, connection, cstmt, record, metric);
-  }
+  public abstract void testPutMetrics() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
index 034876a..3e5b70c 100644
--- a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
+++ b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSink.java
@@ -18,7 +18,13 @@
 
 package org.apache.hadoop.metrics2.sink;
 
-import java.lang.Exception;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.log4j.Logger;
+
 import java.net.InetAddress;
 import java.sql.CallableStatement;
 import java.sql.Connection;
@@ -27,18 +33,10 @@ import java.sql.SQLException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.commons.configuration.SubsetConfiguration;
-import org.apache.hadoop.metrics2.Metric;
-import org.apache.hadoop.metrics2.MetricsException;
-import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.MetricsSink;
-import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.log4j.Logger;
-
 /**
  * This class stores published metrics to the SQL Server database.
  */
-public class SqlServerSink implements MetricsSink {
+public abstract class SqlServerSink implements MetricsSink {
   private static final String DATABASE_URL_KEY = "databaseUrl";
   private static final boolean DEBUG = true;
   private static final String NAME_URL_KEY = "fs.default.name";
@@ -109,22 +107,7 @@ public class SqlServerSink implements MetricsSink {
   }
 
   @Override
-  public void putMetrics(MetricsRecord record) {
-    long metricRecordID = getMetricRecordID(record.context(), record.name(),
-        getLocalNodeName(), getLocalNodeIPAddress(), getClusterNodeName(), currentServiceName,
-        getTagString(record.tags()), record.timestamp());
-    if (metricRecordID < 0)
-      return;
-
-    for (Metric metric : record.metrics()) {
-      insertMetricValue(metricRecordID, metric.name(), String.valueOf(metric
-          .value()));
-      if (metric.name().equals("BlockCapacity")) {
-        insertMetricValue(metricRecordID, "BlockSize", Integer
-            .toString(blockSize));
-      }
-    }
-  }
+  public abstract void putMetrics(MetricsRecord record);
 
   @Override
   public void flush() {
@@ -311,4 +294,12 @@ public class SqlServerSink implements MetricsSink {
       }
     }
   }
+
+  public String getCurrentServiceName() {
+    return currentServiceName;
+  }
+
+  public int getBlockSize() {
+    return blockSize;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java
new file mode 100644
index 0000000..852fb81
--- /dev/null
+++ b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop1.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink;
+
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+
+public class SqlServerSinkHadoop1 extends SqlServerSink {
+  @Override
+  public void putMetrics(MetricsRecord record) {
+    long metricRecordID = getMetricRecordID(record.context(), record.name(),
+            getLocalNodeName(), getLocalNodeIPAddress(), getClusterNodeName(), getCurrentServiceName(),
+            getTagString(record.tags()), record.timestamp());
+    if (metricRecordID < 0)
+      return;
+
+    for (Metric metric : record.metrics()) {
+      insertMetricValue(metricRecordID, metric.name(), String.valueOf(metric
+              .value()));
+      if (metric.name().equals("BlockCapacity")) {
+        insertMetricValue(metricRecordID, "BlockSize", Integer
+                .toString(getBlockSize()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/682e55d0/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java
new file mode 100644
index 0000000..eae76a8
--- /dev/null
+++ b/contrib/ambari-scom/metrics-sink/src/main/java/org/apache/hadoop/metrics2/sink/SqlServerSinkHadoop2.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+
+public class SqlServerSinkHadoop2 extends SqlServerSink {
+  @Override
+  public void putMetrics(MetricsRecord record) {
+    long metricRecordID = getMetricRecordID(record.context(), record.name(),
+            getLocalNodeName(), getLocalNodeIPAddress(), getClusterNodeName(), getCurrentServiceName(),
+            getTagString(record.tags()), record.timestamp());
+    if (metricRecordID < 0)
+      return;
+
+    for (AbstractMetric metric : record.metrics()) {
+      insertMetricValue(metricRecordID, metric.name(), String.valueOf(metric.value()));
+      if (metric.name().equals("BlockCapacity")) {
+        insertMetricValue(metricRecordID, "BlockSize", Integer
+                .toString(getBlockSize()));
+      }
+    }
+  }
+}