You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/01 16:54:02 UTC
[27/50] [abbrv] usergrid git commit: Initial checkin for Python
Utilities and SDK
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/index_iterator_size_checker.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/es_tools/index_iterator_size_checker.py b/utils/usergrid-util-python/es_tools/index_iterator_size_checker.py
new file mode 100644
index 0000000..98d4373
--- /dev/null
+++ b/utils/usergrid-util-python/es_tools/index_iterator_size_checker.py
@@ -0,0 +1,270 @@
+import json
+import re
+import traceback
+from multiprocessing.pool import Pool
+import requests
+
+index_url_template = 'http://localhost:9200/{index_name}/_search?size={size}&from={from_var}&q=-edgeName:zzzcollzzz|logs'
+
+index_names = [
+ 'es-index-name'
+]
+
+baas_url = 'http://localhost:8080/org/{app_id}/{collection}/{entity_id}'
+
+field_part_map = {
+ 'mockdata': 'mockData'
+}
+
+
+def update_entity_field(entity, field_name, field_value):
+ entity_copy = entity.copy()
+
+ worked = True
+ is_array = False
+ array_length = 0
+
+ try:
+ parts = field_name.split('.')
+
+ if parts[len(parts) - 1] != 'size':
+ print parts
+ exit()
+
+ change_me = entity_copy
+
+ for i, field_part in enumerate(parts):
+ field_part = field_part_map.get(field_part, field_part)
+
+ if field_part == 'size':
+ break
+
+ if isinstance(change_me, dict):
+ if field_part not in change_me:
+ worked = False
+ # print 'ERROR! field [%s] not in entity: %s' % (field_part, json.dumps(change_me))
+ break
+
+ change_me = change_me[field_part]
+
+ elif isinstance(change_me, list):
+ array_length = len(change_me)
+
+ if i == len(parts) - 2 and len(parts) > i + 1 and parts[i + 1] == 'size':
+
+ for j in xrange(0, len(change_me)):
+ print 'arrau!'
+ change_me[j] = update_entity_field(change_me[j], '.'.join(parts[i:]), field_value)
+ # element['size'] = field_value
+
+ elif len(change_me) == 1:
+ print 'single array'
+ change_me = change_me[0][field_part]
+ else:
+ print 'WTF!'
+ try:
+ change_me['size'] = field_value
+ except:
+ if array_length != 1:
+ print traceback.format_exc()
+ print 'damn'
+
+ except:
+ print '---Error updating field [%s] in document: %s' % (field_name, json.dumps(entity))
+ print traceback.format_exc()
+
+ if array_length > 1:
+ print '++++++++ARRAY!!!!! %s' % array_length
+
+ return entity_copy
+
+
+def update_entity_fields(entity, fields):
+ entity_copy = entity.copy()
+
+ for field in fields:
+ field_name = field.get('name')
+
+ if 'string' in field:
+ field_value = field.get('string')
+
+ elif 'long' in field:
+ field_value = field.get('long')
+
+ else:
+ print 'WTF! %s' % json.dumps(field)
+ return entity_copy
+
+ entity_copy = update_entity_field(entity_copy, field_name, field_value)
+
+ return entity_copy
+
+
+my = {
+ 'foo': {
+ 'bar': {
+ 'color': 'red'
+ }
+ }
+}
+
+fields = [
+ {
+ 'name': 'foo.size',
+ 'string': '2'
+ },
+ {
+ 'name': 'foo.bar.size',
+ 'long': 2
+ }
+]
+
+
+def work(item):
+ try:
+ url = 'http://localhost:8080/org/{app_id}/{collection}/{entity_id}'.format(
+ app_id=item[0],
+ collection=item[1],
+ entity_id=item[2]
+ )
+ print url
+ r_get = requests.get(url)
+
+ if r_get.status_code != 200:
+ print 'ERROR GETTING ENTITY AT URL: %s' % url
+ return
+
+ response_json = r_get.json()
+
+ entities = response_json.get('entities')
+
+ if len(entities) <= 0:
+ print 'TOO MANY ENTITIES AT URL: %s' % url
+ return
+
+ entity = entities[0]
+
+ new_entity = update_entity_fields(entity, item[3])
+
+ with open('/Users/ApigeeCorporation/tmp/hack/%s.json' % item[2], 'w') as f:
+ json.dump(entity, f, indent=2)
+
+ with open('/Users/ApigeeCorporation/tmp/hack/%s_new.json' % item[2], 'w') as f:
+ json.dump(new_entity, f, indent=2)
+
+ r_put = requests.put(url, data=json.dumps(new_entity))
+
+ if r_put.status_code == 200:
+ print 'PUT [%s]: %s' % (r_put.status_code, url)
+ pass
+ elif r_put.status_code:
+ print 'PUT [%s]: %s | %s' % (r_put.status_code, url, r_put.text)
+
+ except:
+ print traceback.format_exc()
+
+
+POOL_SIZE = 4
+
+counter = 0
+size = POOL_SIZE * 10
+size = 1000
+
+total_docs = 167501577
+start_from = 0
+from_var = 0
+page = 0
+
+work_items = []
+
+pool = Pool(POOL_SIZE)
+
+keep_going = True
+
+while keep_going:
+ work_items = []
+
+ if from_var > total_docs:
+ keep_going = False
+ break
+
+ from_var = start_from + (page * size)
+ page += 1
+
+ for index_name in index_names:
+
+ index_url = index_url_template.format(index_name=index_name, size=size, from_var=from_var)
+
+ print 'Getting URL: ' + index_url
+
+ r = requests.get(index_url)
+
+ if r.status_code != 200:
+ print r.text
+ exit()
+
+ response = r.json()
+
+ hits = response.get('hits', {}).get('hits')
+
+ re_app_id = re.compile('appId\((.+),')
+ re_ent_id = re.compile('entityId\((.+),')
+ re_type = re.compile('entityId\(.+,(.+)\)')
+
+ print 'Index: %s | hits: %s' % (index_name, len(hits))
+
+ if len(hits) == 0:
+ keep_going = False
+ break
+
+ for hit_data in hits:
+ source = hit_data.get('_source')
+
+ application_id = source.get('applicationId')
+
+ app_id_find = re_app_id.findall(application_id)
+
+ if len(app_id_find) > 0:
+ app_id = app_id_find[0]
+
+ if app_id != '5f20f423-f2a8-11e4-a478-12a5923b55dc':
+ print 'SKIPP APP ID: ' + app_id
+ continue
+
+ entity_id_tmp = source.get('entityId')
+
+ entity_id_find = re_ent_id.findall(entity_id_tmp)
+ entity_type_find = re_type.findall(entity_id_tmp)
+
+ if len(entity_id_find) > 0 and len(entity_type_find) > 0:
+ entity_id = entity_id_find[0]
+ collection = entity_type_find[0]
+ fields_to_update = []
+
+ for field in source.get('fields'):
+ if field.get('name')[-5:] == '.size':
+ fields_to_update.append(field)
+
+ print json.dumps(source)
+
+ work_items.append((app_id, collection, entity_id, fields_to_update))
+
+ counter += 1
+
+ print 'Work Items: %s' % len(work_items)
+
+ try:
+ pool.map(work, work_items)
+
+
+ except:
+ print traceback.format_exc()
+
+ try:
+ pool.map(work, work_items)
+ except:
+ pass
+
+ print 'Work Done!'
+
+print 'done: %s' % counter
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/index_prefix_checker.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/es_tools/index_prefix_checker.py b/utils/usergrid-util-python/es_tools/index_prefix_checker.py
new file mode 100644
index 0000000..d72ff3d
--- /dev/null
+++ b/utils/usergrid-util-python/es_tools/index_prefix_checker.py
@@ -0,0 +1,81 @@
+import json
+from collections import defaultdict
+import requests
+import logging
+
+__author__ = 'Jeff West @ ApigeeCorporation'
+
+# This script iterates all the indexes in an ES cluster and aggregates the size by the prefix
+
+url_base = 'http://localhost:9200'
+
+r = requests.get(url_base + "/_stats")
+response = r.json()
+
+indices = r.json()['indices']
+
+print 'retrieved %s indices' % len(indices)
+
+NUMBER_VALUE = 0
+
+includes = [
+ # 'b6768a08-b5d5-11e3-a495-11ddb1de66c9',
+]
+
+excludes = [
+ # 'b6768a08-b5d5-11e3-a495-11ddb1de66c8',
+]
+
+counter = 0
+process = False
+
+counts = defaultdict(int)
+sizes = defaultdict(int)
+indexes = {}
+
+for index, index_data in indices.iteritems():
+ process = False
+ counter += 1
+
+ if 'management' in index:
+ print index
+
+ # print 'index %s of %s' % (counter, len(indices))
+
+ if len(includes) == 0:
+ process = True
+ else:
+ for include in includes:
+
+ if include in index:
+ process = True
+
+ if len(excludes) > 0:
+ for exclude in excludes:
+ if exclude in index:
+ process = False
+
+ if process:
+ # print index
+ if '__' in index:
+ index_prefix = index.split('__')[0]
+ elif '^' in index:
+ index_prefix = index.split('^')[0]
+ else:
+ index_prefix = index.split('_')[0]
+
+ if index_prefix not in indexes:
+ indexes[index_prefix] = []
+
+ indexes[index_prefix].append(index)
+
+ counts[index_prefix] += 1
+ counts['total'] += 1
+ sizes[index_prefix] += (float(index_data.get('total', {}).get('store', {}).get('size_in_bytes')) / 1e+9)
+ sizes['total'] += (float(index_data.get('total', {}).get('store', {}).get('size_in_bytes')) / 1e+9)
+
+print 'Number of indices (US-EAST):'
+print json.dumps(counts, indent=2)
+print 'Size in GB'
+print json.dumps(sizes, indent=2)
+print json.dumps(indexes, indent=2)
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/index_replica_setter.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/es_tools/index_replica_setter.py b/utils/usergrid-util-python/es_tools/index_replica_setter.py
new file mode 100644
index 0000000..7180fed
--- /dev/null
+++ b/utils/usergrid-util-python/es_tools/index_replica_setter.py
@@ -0,0 +1,118 @@
+from multiprocessing import Pool
+import requests
+import time
+
+__author__ = 'Jeff West @ ApigeeCorporation'
+
+# utility for updating the replicas of a set of indexes that are no longer needed. Given:
+# A) a set of strings to include when evaluating the index names to update
+# B) a set of strings to Exclude when evaluating the index names to update
+#
+# The general logic is:
+# 1) If the include set is empty, or if the index name contains a string in the 'include' set, then update
+# 2) If the index contains a string in the exclude list, do not update
+
+
+url_base = 'http://localhost:9200'
+
+# r = requests.get(url_base + "/_cat/indices?v")
+# print r.text
+
+r = requests.get(url_base + "/_stats")
+
+# print json.dumps(r.json(), indent=2)
+
+indices = r.json()['indices']
+
+print 'retrieved %s indices' % len(indices)
+
+NUMBER_VALUE = 1
+
+payload = {
+ "index.number_of_replicas": NUMBER_VALUE,
+}
+
+# indices = ['usergrid__a34ad389-b626-11e4-848f-06b49118d7d0__application_manual']
+
+includes = [
+ # '70be096e-c2e1-11e4-8a55-12b4f5e28868',
+ # 'b0c640af-bc6c-11e4-b078-12b4f5e28868',
+ # 'e62e465e-bccc-11e4-b078-12b4f5e28868',
+ # 'd82b6413-bccc-11e4-b078-12b4f5e28868',
+ # '45914256-c27f-11e4-8a55-12b4f5e28868',
+ # '2776a776-c27f-11e4-8a55-12b4f5e28868',
+ # 'a54f878c-bc6c-11e4-b044-0e4cd56e19cd',
+ # 'ed5b47ea-bccc-11e4-b078-12b4f5e28868',
+ # 'bd4874ab-bccc-11e4-b044-0e4cd56e19cd',
+ # '3d748996-c27f-11e4-8a55-12b4f5e28868',
+ # '1daab807-c27f-11e4-8a55-12b4f5e28868',
+ # 'd0c4f0da-d961-11e4-849d-12b4f5e28868',
+ # '93e756ac-bc4e-11e4-92ae-12b4f5e28868',
+]
+
+excludes = [
+ # 'b6768a08-b5d5-11e3-a495-11ddb1de66c8',
+ # 'b6768a08-b5d5-11e3-a495-10ddb1de66c3',
+ # 'b6768a08-b5d5-11e3-a495-11ddb1de66c9',
+ # 'a34ad389-b626-11e4-848f-06b49118d7d0'
+]
+
+counter = 0
+update = False
+# print 'sleeping 1200s'
+# time.sleep(1200)
+
+index_names = sorted([index for index in indices])
+
+
+def update_shards(index_name):
+ update = False
+ # counter += 1
+ #
+ # print 'index %s of %s' % (counter, len(indices))
+
+ if len(includes) == 0:
+ update = True
+ else:
+ for include in includes:
+
+ if include in index_name:
+ update = True
+
+ if len(excludes) > 0:
+ for exclude in excludes:
+ if exclude in index_name:
+ update = False
+
+ if update:
+ print index_name
+
+ # url = '%s/%s/_settings' % (url_base, index)
+ # print url
+ #
+ # response = requests.get('%s/%s/_settings' % (url_base, index))
+ # settings = response.json()
+ #
+ # index_settings = settings[index]['settings']['index']
+ #
+ # current_replicas = int(index_settings.get('number_of_replicas'))
+ #
+ # if current_replicas == NUMBER_VALUE:
+ # continue
+
+ success = False
+
+ while not success:
+
+ response = requests.put('%s/%s/_settings' % (url_base, index_name), data=payload)
+
+ if response.status_code == 200:
+ success = True
+ print '200: %s: %s' % (index_name, response.text)
+ else:
+ print '%s: %s: %s' % (response.status_code, index_name, response.text)
+
+
+pool = Pool(8)
+
+pool.map(update_shards, index_names)
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/index_shard_allocator.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/es_tools/index_shard_allocator.py b/utils/usergrid-util-python/es_tools/index_shard_allocator.py
new file mode 100644
index 0000000..ecee095
--- /dev/null
+++ b/utils/usergrid-util-python/es_tools/index_shard_allocator.py
@@ -0,0 +1,148 @@
+import json
+from multiprocessing import Pool
+
+import requests
+
+__author__ = 'Jeff West @ ApigeeCorporation'
+
+# The purpose of this script is to update the shard allocation of ElasticSearch for specific indexes to be set to
+# specific nodes. The reason for doing this is to isolate the nodes on which certain indexes run for specific
+# customers due to load, disk size or any other factors.
+
+
+nodes_c32xl = [
+ 'res000eu',
+ 'res001eu',
+ 'res002eu',
+ 'res003eu',
+ 'res004eu',
+ 'res005eu',
+ 'res009eu',
+ 'res010eu',
+ 'res011eu',
+ 'res012eu',
+ 'res013eu',
+ 'res014eu',
+]
+
+nodes_c34xl = [
+ 'res015eu',
+ 'res018eu',
+ 'res019eu',
+ 'res020eu',
+ 'res021eu',
+ 'res022eu',
+ 'res023eu',
+ 'res024eu',
+ 'res025eu',
+ 'res026eu',
+ 'res027eu',
+ 'res028eu'
+]
+
+nodes = nodes_c34xl
+
+url_base = 'http://localhost:9200'
+
+nodes_string = ",".join(nodes)
+
+payload = {
+ "index.routing.allocation.include._host": "",
+ "index.routing.allocation.exclude._host": nodes_string
+}
+
+# payload = {
+# "index.routing.allocation.include._host": "",
+# "index.routing.allocation.exclude._host": ""
+# }
+
+print json.dumps(payload )
+
+
+r = requests.get(url_base + "/_stats")
+indices = r.json()['indices']
+
+print 'retrieved %s indices' % len(indices)
+
+includes = [
+ # '70be096e-c2e1-11e4-8a55-12b4f5e28868',
+ # 'b0c640af-bc6c-11e4-b078-12b4f5e28868',
+ # 'e62e465e-bccc-11e4-b078-12b4f5e28868',
+ # 'd82b6413-bccc-11e4-b078-12b4f5e28868',
+ # '45914256-c27f-11e4-8a55-12b4f5e28868',
+ # '2776a776-c27f-11e4-8a55-12b4f5e28868',
+ # 'a54f878c-bc6c-11e4-b044-0e4cd56e19cd',
+ # 'ed5b47ea-bccc-11e4-b078-12b4f5e28868',
+ # 'bd4874ab-bccc-11e4-b044-0e4cd56e19cd',
+ # '3d748996-c27f-11e4-8a55-12b4f5e28868',
+ # '1daab807-c27f-11e4-8a55-12b4f5e28868',
+ # 'd0c4f0da-d961-11e4-849d-12b4f5e28868',
+ # '93e756ac-bc4e-11e4-92ae-12b4f5e28868',
+ #
+ # 'b6768a08-b5d5-11e3-a495-11ddb1de66c8',
+ # 'b6768a08-b5d5-11e3-a495-10ddb1de66c3',
+ # 'b6768a08-b5d5-11e3-a495-11ddb1de66c9',
+]
+
+excludes = [
+ #
+ # '70be096e-c2e1-11e4-8a55-12b4f5e28868',
+ # 'b0c640af-bc6c-11e4-b078-12b4f5e28868',
+ # 'e62e465e-bccc-11e4-b078-12b4f5e28868',
+ # 'd82b6413-bccc-11e4-b078-12b4f5e28868',
+ # '45914256-c27f-11e4-8a55-12b4f5e28868',
+ # '2776a776-c27f-11e4-8a55-12b4f5e28868',
+ # 'a54f878c-bc6c-11e4-b044-0e4cd56e19cd',
+ # 'ed5b47ea-bccc-11e4-b078-12b4f5e28868',
+ # 'bd4874ab-bccc-11e4-b044-0e4cd56e19cd',
+ # '3d748996-c27f-11e4-8a55-12b4f5e28868',
+ # '1daab807-c27f-11e4-8a55-12b4f5e28868',
+ # 'd0c4f0da-d961-11e4-849d-12b4f5e28868',
+ # '93e756ac-bc4e-11e4-92ae-12b4f5e28868',
+ #
+ # 'b6768a08-b5d5-11e3-a495-11ddb1de66c8',
+ # 'b6768a08-b5d5-11e3-a495-10ddb1de66c3',
+ # 'b6768a08-b5d5-11e3-a495-11ddb1de66c9',
+]
+
+counter = 0
+update = False
+
+for index_name in indices:
+ update = False
+ counter += 1
+
+ # print 'Checking index %s of %s: %s' % (counter, len(indices), index_name)
+
+ if len(includes) == 0:
+ update = True
+ else:
+ for include in includes:
+
+ if include in index_name:
+ update = True
+
+ if len(excludes) > 0:
+ for exclude in excludes:
+ if exclude in index_name:
+ update = False
+
+ if not update:
+ print 'Skipping %s of %s: %s' % (counter, len(indices), index_name)
+ else:
+ print '+++++Processing %s of %s: %s' % (counter, len(indices), index_name)
+
+ url_template = '%s/%s/_settings' % (url_base, index_name)
+ print url_template
+
+ success = False
+
+ while not success:
+
+ response = requests.put('%s/%s/_settings' % (url_base, index_name), data=json.dumps(payload))
+
+ if response.status_code == 200:
+ success = True
+ print '200: %s: %s' % (index_name, response.text)
+ else:
+ print '%s: %s: %s' % (response.status_code, index_name, response.text)
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/mapping_deleter.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/es_tools/mapping_deleter.py b/utils/usergrid-util-python/es_tools/mapping_deleter.py
new file mode 100644
index 0000000..74ad898
--- /dev/null
+++ b/utils/usergrid-util-python/es_tools/mapping_deleter.py
@@ -0,0 +1,34 @@
+import json
+
+import requests
+
+
+__author__ = 'Jeff West @ ApigeeCorporation'
+
+url_base = 'http://localhost:9200'
+
+SOURCE_INDEX = '5f20f423-f2a8-11e4-a478-12a5923b55dc__application_v6'
+
+url_template = '%s/{index_name}/_mapping' % url_base
+
+source_index_url = '%s/%s' % (url_base, SOURCE_INDEX)
+
+index_name = SOURCE_INDEX
+
+index_data = requests.get(url_template.format(index_name=index_name)).json()
+
+mappings = index_data.get(index_name, {}).get('mappings', {})
+
+for type_name, mapping_detail in mappings.iteritems():
+ print 'Index: %s | Type: %s: | Properties: %s' % (index_name, type_name, len(mappings[type_name]['properties']))
+
+ if type_name == '_default_':
+ continue
+
+ r = requests.delete('%s/%s/_mapping/%s' % (url_base, index_name, type_name))
+
+ print '%s: %s' % (r.status_code, r.text)
+
+ # print json.dumps(r.json(), indent=2)
+ # time.sleep(5)
+ print '---'
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/mapping_retriever.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/es_tools/mapping_retriever.py b/utils/usergrid-util-python/es_tools/mapping_retriever.py
new file mode 100644
index 0000000..0da123b
--- /dev/null
+++ b/utils/usergrid-util-python/es_tools/mapping_retriever.py
@@ -0,0 +1,45 @@
+import json
+import requests
+
+__author__ = 'Jeff West @ ApigeeCorporation'
+
+# Utility to iterate the mappings for an index and save them locally
+
+url_base = 'http://localhost:9200'
+
+# r = requests.get(url_base + "/_stats")
+#
+# indices = r.json()['indices']
+
+url_template = '%s/{index_name}/_mapping' % url_base
+
+SOURCE_INDEX = '5f20f423-f2a8-11e4-a478-12a5923b55dc__application_v7'
+
+source_index_url = '%s/%s' % (url_base, SOURCE_INDEX)
+
+index_name = SOURCE_INDEX
+print 'Getting ' + url_template.format(index_name=index_name)
+
+r = requests.get(url_template.format(index_name=index_name))
+index_data = r.json()
+
+mappings = index_data.get(index_name, {}).get('mappings', {})
+
+for type_name, mapping_detail in mappings.iteritems():
+ print 'Index: %s | Type: %s: | Properties: %s' % (index_name, type_name, len(mappings[type_name]['properties']))
+
+ print 'Processing %s' % type_name
+
+ filename = '/Users/ApigeeCorporation/tmp/%s_%s_source_mapping.json' % (
+ SOURCE_INDEX, type_name)
+
+ print filename
+
+ with open(filename, 'w') as f:
+ json.dump({type_name: mapping_detail}, f, indent=2)
+
+ # print '%s' % (r.status_code, r.text)
+
+ # print json.dumps(r.json(), indent=2)
+ # time.sleep(5)
+ print 'Done!'
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/monitor_tasks.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/es_tools/monitor_tasks.py b/utils/usergrid-util-python/es_tools/monitor_tasks.py
new file mode 100644
index 0000000..df23d49
--- /dev/null
+++ b/utils/usergrid-util-python/es_tools/monitor_tasks.py
@@ -0,0 +1,41 @@
+import datetime
+import requests
+import time
+
+__author__ = 'Jeff West @ ApigeeCorporation'
+
+# Utility for monitoring pending tasks in ElasticSearch
+
+def total_milliseconds(td):
+ return (td.microseconds + td.seconds * 1000000) / 1000
+
+
+url_template = "http://localhost:9200/_cat/pending_tasks?v'"
+
+x = 0
+
+SLEEP_TIME = 3
+
+while True:
+ x += 13
+ try:
+
+ r = requests.get(url=url_template)
+ lines = r.text.split('\n')
+
+ print '\n-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-'
+ print '+++++++++++++++++++++++++++++++++++++++++++++++++++++++++'
+ print datetime.datetime.utcnow()
+ if len(lines) > 1:
+ print r.text
+ else:
+ print 'None'
+
+ print '-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-'
+ print '-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-\n'
+
+ except:
+ pass
+
+ time.sleep(SLEEP_TIME)
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/index_test/README.md
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/index_test/README.md b/utils/usergrid-util-python/index_test/README.md
new file mode 100644
index 0000000..eed7f1c
--- /dev/null
+++ b/utils/usergrid-util-python/index_test/README.md
@@ -0,0 +1 @@
+This set of scripts was intended to test indexing times and sizes for the new indexing scheme
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/index_test/document_creator.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/index_test/document_creator.py b/utils/usergrid-util-python/index_test/document_creator.py
new file mode 100644
index 0000000..fd544c6
--- /dev/null
+++ b/utils/usergrid-util-python/index_test/document_creator.py
@@ -0,0 +1,254 @@
+from __future__ import print_function
+from Queue import Empty
+import json
+from multiprocessing import JoinableQueue, Process
+import random
+import re
+import uuid
+import sys
+
+import argparse
+
+import loremipsum
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(description='ElasticSearch Index Test 1')
+
+ parser.add_argument('-w', '--workers',
+ help='The number of worker threads',
+ type=int,
+ default=8)
+
+ parser.add_argument('-dc', '--document_count',
+ help='The number of documents per index',
+ type=long,
+ default=100000000)
+
+ parser.add_argument('--output',
+ help='The filename to write to',
+ type=str,
+ default='generated_documents.txt')
+
+ parser.add_argument('--fields_min',
+ help='The min number of fields per document',
+ type=long,
+ default=10)
+
+ parser.add_argument('--fields_max',
+ help='The max number of fields per document',
+ type=long,
+ default=100)
+
+ parser.add_argument('-tp', '--type_prefix',
+ help='The Prefix to use for type names',
+ type=str,
+ default='type_this')
+
+ my_args = parser.parse_args(sys.argv[1:])
+
+ return vars(my_args)
+
+
+args = parse_args()
+
+sentence_list = loremipsum.get_sentences(10000)
+
+
+class Worker(Process):
+ def __init__(self, work_queue, response_queue):
+ super(Worker, self).__init__()
+ self.work_queue = work_queue
+ self.response_queue = response_queue
+ self.sentence_list = loremipsum.get_sentences(1000)
+ self.re_first_word = re.compile('([A-z]+)')
+
+ def run(self):
+ print('Starting %s ' % self.name)
+
+ while True:
+ task = self.work_queue.get(timeout=600)
+ field_count = random.randint(task['fields_min'], task['fields_max'])
+ document = self.generate_document(field_count)
+ flattened_doc = self.process_document(document,
+ task['uuid'],
+ task['uuid'])
+
+ self.response_queue.put(flattened_doc)
+
+ self.work_queue.task_done()
+
+ def generate_document(self, fields):
+
+ doc = {}
+
+ my_bool = True
+
+ for i in xrange(fields):
+ sentence_index = random.randint(0, max((fields / 2) - 1, 1))
+ sentence = self.sentence_list[sentence_index]
+
+ if random.random() >= .5:
+ key = self.re_first_word.findall(sentence)[1]
+ else:
+ key = self.re_first_word.findall(sentence)[1] + str(i)
+
+ field_type = random.random()
+
+ if field_type <= 0.3:
+ doc[key] = sentence
+
+ elif field_type <= 0.5:
+ doc[key] = random.randint(1, 1000000)
+
+ elif field_type <= 0.6:
+ doc[key] = random.random() * 1000000000
+
+ elif field_type == 0.7:
+ doc[key] = my_bool
+ my_bool = not my_bool
+
+ elif field_type == 0.8:
+ doc[key] = self.generate_document(max(fields / 5, 1))
+
+ elif field_type <= 1.0:
+ doc['mylocation'] = self.generate_location()
+
+ return doc
+
+ @staticmethod
+ def get_fields(document, base_name=None):
+ fields = []
+
+ for name, value in document.iteritems():
+ if base_name:
+ field_name = '%s.%s' % (base_name, name)
+ else:
+ field_name = name
+
+ if isinstance(value, dict):
+ fields += Worker.get_fields(value, field_name)
+ else:
+ value_name = None
+ if isinstance(value, basestring):
+ value_name = 'string'
+
+ elif isinstance(value, bool):
+ value_name = 'boolean'
+
+ elif isinstance(value, (int, long)):
+ value_name = 'long'
+
+ elif isinstance(value, float):
+ value_name = 'double'
+
+ if value_name:
+ field = {
+ 'name': field_name,
+ value_name: value
+ }
+ else:
+ field = {
+ 'name': field_name,
+ 'string': str(value)
+ }
+
+ fields.append(field)
+
+ return fields
+
+
+ @staticmethod
+ def process_document(document, application_id, uuid):
+ response = {
+ 'entityId': uuid,
+ 'entityVersion': '1',
+ 'applicationId': application_id,
+ 'fields': Worker.get_fields(document)
+ }
+
+ return response
+
+ def generate_location(self):
+ response = {}
+
+ lat = random.random() * 90.0
+ lon = random.random() * 180.0
+
+ lat_neg_true = True if lon > .5 else False
+ lon_neg_true = True if lat > .5 else False
+
+ lat = lat * -1.0 if lat_neg_true else lat
+ lon = lon * -1.0 if lon_neg_true else lon
+
+ response['location'] = {
+ 'lat': lat,
+ 'lon': lon
+ }
+
+ return response
+
+
+class Writer(Process):
+ def __init__(self, document_queue):
+ super(Writer, self).__init__()
+ self.document_queue = document_queue
+
+ def run(self):
+ keep_going = True
+
+ with open(args['output'], 'w') as f:
+ while keep_going:
+ try:
+ document = self.document_queue.get(timeout=300)
+ print(json.dumps(document), file=f)
+
+ except Empty:
+ print('done!')
+ keep_going = False
+
+
+def total_milliseconds(td):
+ return (td.microseconds + td.seconds * 1000000) / 1000
+
+
+def main():
+ work_queue = JoinableQueue()
+ response_queue = JoinableQueue()
+
+ workers = [Worker(work_queue, response_queue) for x in xrange(args.get('workers'))]
+
+ writer = Writer(response_queue)
+ writer.start()
+
+ [worker.start() for worker in workers]
+
+ try:
+ total_messages = args.get('document_count')
+ batch_size = 100000
+ message_counter = 0
+
+ for doc_number in xrange(total_messages):
+ message_counter += 1
+
+ for count in xrange(batch_size):
+ doc_id = str(uuid.uuid1())
+
+ task = {
+ 'fields_min': args['fields_min'],
+ 'fields_max': args['fields_max'],
+ 'uuid': doc_id
+ }
+
+ work_queue.put(task)
+
+ print('Joining queues counter=[%s]...' % message_counter)
+ work_queue.join()
+ response_queue.join()
+ print('Done queue counter=[%s]...' % message_counter)
+
+ except KeyboardInterrupt:
+ [worker.terminate() for worker in workers]
+
+
+main()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/index_test/index_test_mixed_batch.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/index_test/index_test_mixed_batch.py b/utils/usergrid-util-python/index_test/index_test_mixed_batch.py
new file mode 100644
index 0000000..d1dd40c
--- /dev/null
+++ b/utils/usergrid-util-python/index_test/index_test_mixed_batch.py
@@ -0,0 +1,545 @@
+import json
+from multiprocessing import JoinableQueue, Process
+import random
+import re
+import traceback
+import uuid
+import time
+import sys
+
+import argparse
+import loremipsum
+import requests
+from elasticsearch import Elasticsearch
+
+
+es_hosts = [
+ {'host': 'ees000wo', 'port': 9200},
+ {'host': 'ees001wo', 'port': 9200},
+ {'host': 'ees002wo', 'port': 9200},
+ {'host': 'ees003wo', 'port': 9200},
+ {'host': 'ees004wo', 'port': 9200},
+ {'host': 'ees005wo', 'port': 9200},
+ {'host': 'ees006wo', 'port': 9200},
+ {'host': 'ees007wo', 'port': 9200},
+ {'host': 'ees008wo', 'port': 9200},
+ {'host': 'ees009wo', 'port': 9200},
+ {'host': 'ees010wo', 'port': 9200},
+ {'host': 'ees011wo', 'port': 9200},
+ {'host': 'ees012wo', 'port': 9200},
+ {'host': 'ees013wo', 'port': 9200},
+ {'host': 'ees014wo', 'port': 9200},
+ {'host': 'ees015wo', 'port': 9200},
+ {'host': 'ees016wo', 'port': 9200},
+ {'host': 'ees017wo', 'port': 9200}
+]
+
+def parse_args():
+ parser = argparse.ArgumentParser(description='ElasticSearch Index Test 1')
+
+ parser.add_argument('-t', '--type_count',
+ help='The number of types to produce',
+ type=int,
+ default=100)
+
+ parser.add_argument('-ic', '--index_count',
+ help='The number of indices to create',
+ type=int,
+ default=10)
+
+ parser.add_argument('-sc', '--shard_count',
+ help='The number of indices to create',
+ type=int,
+ default=18)
+
+ parser.add_argument('-rc', '--replica_count',
+ help='The number of indices to create',
+ type=int,
+ default=1)
+
+ parser.add_argument('-w', '--workers',
+ help='The number of worker threads',
+ type=int,
+ default=8)
+
+ parser.add_argument('-dc', '--document_count',
+ help='The number of documents per index',
+ type=long,
+ default=100000000)
+
+ parser.add_argument('-bs', '--batch_size',
+ help='The size of batches to send to ES',
+ type=long,
+ default=25)
+
+ parser.add_argument('-ip', '--index_prefix',
+ help='The Prefix to use for index names',
+ type=str,
+ default='apigee_ftw')
+
+ parser.add_argument('-tp', '--type_prefix',
+ help='The Prefix to use for type names',
+ type=str,
+ default='type_this')
+
+ parser.add_argument('-s', '--setup',
+ help='The Prefix to use for type names',
+ action='store_true')
+
+ my_args = parser.parse_args(sys.argv[1:])
+
+ return vars(my_args)
+
+
+args = parse_args()
+
+
+class APIClient():
+ def __init__(self, base_url):
+ self.base_url = base_url
+
+ def put(self, path='/', data=None):
+ if not data:
+ data = {}
+
+ url = '%s%s' % (self.base_url, path)
+ r = requests.put(url, json.dumps(data))
+
+ if r.status_code == 200:
+ print 'PUT (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed))
+ return r.json()
+
+ raise Exception('HTTP %s calling PUT on URL=[%s]: %s' % (r.status_code, url, r.text))
+
+ def index_batch(self, batch):
+
+ data = ''
+
+ for element in batch:
+ index_tuple = element[0]
+ doc = element[1]
+ data += '{ "index" : { "_index" : "%s", "_type" : "%s", "_id" : "%s" } }\n' % (
+ index_tuple[0], index_tuple[1], doc['entityId'])
+ data += json.dumps(doc)
+ data += '\n'
+
+ url = '%s/_bulk' % self.base_url
+
+ # print data
+
+ r = requests.post(url, data)
+
+ # print json.dumps(r.json(), indent=2)
+
+ if r.status_code == 200:
+ print 'PUT (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed))
+ return r.json()
+
+ raise Exception('HTTP %s calling POST URL=[%s]: %s' % (r.status_code, url, r.text))
+
+ def delete(self, index):
+ url = '%s%s' % (self.base_url, index)
+ r = requests.delete(url)
+
+ if r.status_code == 200:
+ print 'DELETE (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed))
+ return r.json()
+
+ raise Exception('HTTP %s calling DELETE URL=[%s]: %s' % (r.status_code, url, r.text))
+
+ def create_index(self, name=None, shards=18 * 3, replicas=1):
+ data = {
+ "settings": {
+ "index": {
+ "action": {
+ "write_consistency": "one"
+ },
+ "number_of_shards": shards,
+ "number_of_replicas": replicas
+ }
+ }
+ }
+
+ try:
+ print 'Creating index %s' % name
+ response = self.put('/%s/' % name.lower(), data)
+
+ print response
+
+ except Exception, e:
+ print traceback.format_exc()
+
+ def delete_index(self, name):
+ try:
+ response = self.delete('/%s/' % name.lower())
+
+ print response
+
+ except Exception, e:
+ print traceback.format_exc()
+
+ def define_type_mapping(self, index_name, type_name):
+ try:
+ url = '/%s/_mapping/%s' % (index_name, type_name)
+ print url
+
+ response = self.put(url, get_type_mapping(type_name))
+
+ print response
+
+ except Exception, e:
+ print traceback.format_exc()
+
+
+class Worker(Process):
+ def __init__(self, work_queue):
+ super(Worker, self).__init__()
+ self.api_client = APIClient('http://%s:9200' % es_hosts[random.randint(0, len(es_hosts) - 1)].get('host'))
+ self.work_queue = work_queue
+ self.es = Elasticsearch(es_hosts)
+ self.sentence_list = loremipsum.get_sentences(1000)
+ self.re_first_word = re.compile('([A-z]+)')
+
+ def run(self):
+ print 'Starting %s ' % self.name
+ counter = 0
+
+ batch = []
+
+ while True:
+ index_batch_size = args.get('batch_size')
+ task = self.work_queue.get(timeout=600)
+ counter += 1
+
+ document = self.generate_document(task['field_count'])
+ flattened_doc = self.process_document(document,
+ task['type'],
+ task['uuid'],
+ task['uuid'])
+
+ index_type_tuple = (task['index'], task['type'])
+
+ # self.handle_document(task['index'], task['type'], task['uuid'], flattened_doc)
+
+ batch.append((index_type_tuple, flattened_doc))
+
+ if len(batch) >= index_batch_size:
+ self.handle_batch(batch)
+ batch = []
+
+ self.work_queue.task_done()
+
+ def generate_document(self, fields):
+
+ doc = {}
+
+ my_bool = True
+
+ for i in xrange(fields):
+ sentence_index = random.randint(0, max((fields / 2) - 1, 1))
+ sentence = self.sentence_list[sentence_index]
+
+ if random.random() >= .5:
+ key = self.re_first_word.findall(sentence)[1]
+ else:
+ key = self.re_first_word.findall(sentence)[1] + str(i)
+
+ field_type = random.random()
+
+ if field_type <= 0.3:
+ doc[key] = sentence
+
+ elif field_type <= 0.5:
+ doc[key] = random.randint(1, 1000000)
+
+ elif field_type <= 0.6:
+ doc[key] = random.random() * 1000000000
+
+ elif field_type == 0.7:
+ doc[key] = my_bool
+ my_bool = not my_bool
+
+ elif field_type == 0.8:
+ doc[key] = self.generate_document(max(fields / 5, 1))
+
+ elif field_type <= 1.0:
+ doc['mylocation'] = self.generate_location()
+
+ return doc
+
+ @staticmethod
+ def get_fields(document, base_name=None):
+ fields = []
+
+ for name, value in document.iteritems():
+ if base_name:
+ field_name = '%s.%s' % (base_name, name)
+ else:
+ field_name = name
+
+ if isinstance(value, dict):
+ fields += Worker.get_fields(value, field_name)
+ else:
+ value_name = None
+ if isinstance(value, basestring):
+ value_name = 'string'
+
+ elif isinstance(value, bool):
+ value_name = 'boolean'
+
+ elif isinstance(value, (int, long)):
+ value_name = 'long'
+
+ elif isinstance(value, float):
+ value_name = 'double'
+
+ if value_name:
+ field = {
+ 'name': field_name,
+ value_name: value
+ }
+ else:
+ field = {
+ 'name': field_name,
+ 'string': str(value)
+ }
+
+ fields.append(field)
+
+ return fields
+
+
+ @staticmethod
+ def process_document(document, doc_type, application_id, uuid):
+ response = {
+ 'entityId': uuid,
+ 'entityVersion': '1',
+ 'entityType': doc_type,
+ 'applicationId': application_id,
+ 'fields': Worker.get_fields(document)
+ }
+
+ return response
+
+ def handle_document(self, index, doc_type, uuid, document):
+
+ res = self.es.create(index=index,
+ doc_type=doc_type,
+ id=uuid,
+ body=document)
+
+ print res
+
+ def generate_location(self):
+ response = {}
+
+ lat = random.random() * 90.0
+ lon = random.random() * 180.0
+
+ lat_neg_true = True if lon > .5 else False
+ lon_neg_true = True if lat > .5 else False
+
+ lat = lat * -1.0 if lat_neg_true else lat
+ lon = lon * -1.0 if lon_neg_true else lon
+
+ response['location'] = {
+ 'lat': lat,
+ 'lon': lon
+ }
+
+ return response
+
+ def handle_batch(self, batch):
+ print 'HANDLE BATCH size=%s' % len(batch)
+ # self.api_client.define_type_mapping(index, doc_type)
+ self.api_client.index_batch(batch)
+
+
+def total_milliseconds(td):
+ return (td.microseconds + td.seconds * 1000000) / 1000
+
+
+def get_type_mapping(type_name):
+ return {
+ type_name: {
+ "_routing": {
+ "path": "entityId",
+ "required": True
+ },
+ "properties": {
+ "entityId": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "entityVersion": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "entityType": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "applicationId": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "nodeId": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "edgeName": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "entityNodeType": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "edgeTimestamp": {
+ "type": "long",
+ "doc_values": True
+ },
+ "edgeSearch": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "fields": {
+ "type": "nested",
+ "properties": {
+ "name": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "boolean": {
+ "type": "boolean",
+ "doc_values": True
+ },
+ "long": {
+ "type": "long",
+ "doc_values": True
+ },
+ "double": {
+ "type": "double",
+ "doc_values": True
+ },
+ "location": {
+ "type": "geo_point",
+ "lat_lon": True,
+ "geohash": True,
+ "doc_values": True
+ },
+ "string": {
+ "type": "string",
+ "norms": {
+ "enabled": False
+ },
+ "fields": {
+ "exact": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ }
+ }
+ },
+ "uuid": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ }
+ }
+ }
+ },
+ "_all": {
+ "enabled": False
+ }
+
+ }
+ }
+
+
+def main():
+ INDEX_COUNT = args.get('index_count')
+ TYPE_COUNT = args.get('type_count')
+ SETUP = args.get('setup')
+
+ indices = []
+ types = []
+ work_queue = JoinableQueue()
+
+ apiclient = APIClient('http://%s:9200' % es_hosts[random.randint(0, len(es_hosts) - 1)].get('host'))
+
+ workers = [Worker(work_queue) for x in xrange(args.get('workers'))]
+ [worker.start() for worker in workers]
+
+ try:
+ #
+ for x in xrange(TYPE_COUNT):
+ type_name = '%s_%s' % (args.get('type_prefix'), x)
+ types.append(type_name)
+
+ for x in xrange(INDEX_COUNT):
+ index_name = '%s_%s' % (args.get('index_prefix'), x)
+ indices.append(index_name)
+
+ if SETUP:
+ print 'Running setup...'
+
+ for index_name in indices:
+ apiclient.delete_index(index_name)
+
+ time.sleep(1)
+
+ for index_name in indices:
+ apiclient.create_index(
+ index_name,
+ shards=args['shard_count'],
+ replicas=args['replica_count'])
+
+ # time.sleep(5)
+
+ # for index_name in indices:
+ # for type_name in types:
+ # apiclient.define_type_mapping(index_name, type_name)
+
+ # time.sleep(5)
+
+ total_messages = args.get('document_count')
+ batch_size = 100000
+ message_counter = 0
+ fields = random.randint(50, 100)
+
+ while message_counter < total_messages:
+
+ for count in xrange(batch_size):
+
+ for index_name in indices:
+ doc_id = str(uuid.uuid1())
+
+ task = {
+ 'field_count': fields,
+ 'uuid': doc_id,
+ 'index': index_name,
+ 'type': types[random.randint(0, len(types) - 1)]
+ }
+
+ work_queue.put(task)
+
+ print 'Joining queue counter=[%s]...' % message_counter
+ work_queue.join()
+ print 'Done queue counter=[%s]...' % message_counter
+ message_counter += batch_size
+
+ except KeyboardInterrupt:
+ [worker.terminate() for worker in workers]
+
+
+main()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/index_test/index_test_single_type_batch.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/index_test/index_test_single_type_batch.py b/utils/usergrid-util-python/index_test/index_test_single_type_batch.py
new file mode 100644
index 0000000..e3afdc3
--- /dev/null
+++ b/utils/usergrid-util-python/index_test/index_test_single_type_batch.py
@@ -0,0 +1,547 @@
+import json
+from multiprocessing import JoinableQueue, Process
+import random
+import re
+import traceback
+import uuid
+import time
+import sys
+
+import argparse
+import loremipsum
+import requests
+from elasticsearch import Elasticsearch
+
+es_hosts = [
+ {'host': 'ees000wo', 'port': 9200},
+ {'host': 'ees001wo', 'port': 9200},
+ {'host': 'ees002wo', 'port': 9200},
+ {'host': 'ees003wo', 'port': 9200},
+ {'host': 'ees004wo', 'port': 9200},
+ {'host': 'ees005wo', 'port': 9200},
+ {'host': 'ees006wo', 'port': 9200},
+ {'host': 'ees007wo', 'port': 9200},
+ {'host': 'ees008wo', 'port': 9200},
+ {'host': 'ees009wo', 'port': 9200},
+ {'host': 'ees010wo', 'port': 9200},
+ {'host': 'ees011wo', 'port': 9200},
+ {'host': 'ees012wo', 'port': 9200},
+ {'host': 'ees013wo', 'port': 9200},
+ {'host': 'ees014wo', 'port': 9200},
+ {'host': 'ees015wo', 'port': 9200},
+ {'host': 'ees016wo', 'port': 9200},
+ {'host': 'ees017wo', 'port': 9200}
+]
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(description='ElasticSearch Index Test 1')
+
+ parser.add_argument('-t', '--type_count',
+ help='The number of types to produce',
+ type=int,
+ default=50)
+
+ parser.add_argument('-ic', '--index_count',
+ help='The number of indices to create',
+ type=int,
+ default=50)
+
+ parser.add_argument('-sc', '--shard_count',
+ help='The number of indices to create',
+ type=int,
+ default=50)
+
+ parser.add_argument('-rc', '--replica_count',
+ help='The number of indices to create',
+ type=int,
+ default=1)
+
+ parser.add_argument('-w', '--workers',
+ help='The number of worker threads',
+ type=int,
+ default=8)
+
+ parser.add_argument('-dc', '--document_count',
+ help='The number of documents per index',
+ type=long,
+ default=100000000)
+
+ parser.add_argument('-bs', '--batch_size',
+ help='The size of batches to send to ES',
+ type=long,
+ default=25)
+
+ parser.add_argument('-ip', '--index_prefix',
+ help='The Prefix to use for index names',
+ type=str,
+ default='apigee_ftw')
+
+ parser.add_argument('-tp', '--type_prefix',
+ help='The Prefix to use for type names',
+ type=str,
+ default='type_this')
+
+ parser.add_argument('-s', '--setup',
+ help='The Prefix to use for type names',
+ action='store_true')
+
+ my_args = parser.parse_args(sys.argv[1:])
+
+ return vars(my_args)
+
+
+args = parse_args()
+
+
+class APIClient():
+ def __init__(self, base_url):
+ self.base_url = base_url
+
+ def put(self, path='/', data=None):
+ if not data:
+ data = {}
+
+ url = '%s%s' % (self.base_url, path)
+ r = requests.put(url, json.dumps(data))
+
+ if r.status_code == 200:
+ print 'PUT (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed))
+ return r.json()
+
+ raise Exception('HTTP %s calling PUT on URL=[%s]: %s' % (r.status_code, url, r.text))
+
+ def index_docs(self, index, documents, type):
+
+ data = ''
+
+ for doc in documents:
+ data += '{ "index" : { "_index" : "%s", "_type" : "%s", "_id" : "%s" } }\n' % (index, type, doc['entityId'])
+ data += json.dumps(doc)
+ data += '\n'
+
+ url = '%s/_bulk' % self.base_url
+
+ # print data
+
+ r = requests.post(url, data)
+
+ # print json.dumps(r.json(), indent=2)
+
+ if r.status_code == 200:
+ print 'PUT (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed))
+ return r.json()
+
+ raise Exception('HTTP %s calling POST URL=[%s]: %s' % (r.status_code, url, r.text))
+
+ def delete(self, index):
+ url = '%s%s' % (self.base_url, index)
+ r = requests.delete(url)
+
+ if r.status_code == 200:
+ print 'DELETE (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed))
+ return r.json()
+
+ raise Exception('HTTP %s calling DELETE URL=[%s]: %s' % (r.status_code, url, r.text))
+
+ def create_index(self, name=None, shards=18 * 3, replicas=1):
+ data = {
+ "settings": {
+ "index": {
+ "action": {
+ "write_consistency": "one"
+ },
+ "number_of_shards": shards,
+ "number_of_replicas": replicas
+ }
+ }
+ }
+
+ try:
+ print 'Creating index %s' % name
+ response = self.put('/%s/' % name.lower(), data)
+
+ print response
+
+ except Exception, e:
+ print traceback.format_exc()
+
+ def delete_index(self, name):
+ try:
+ response = self.delete('/%s/' % name.lower())
+
+ print response
+
+ except Exception, e:
+ print traceback.format_exc()
+
+ def define_type_mapping(self, index_name, type_name):
+ try:
+ url = '/%s/_mapping/%s' % (index_name, type_name)
+ print url
+
+ response = self.put(url, get_type_mapping(type_name))
+
+ print response
+
+ except Exception, e:
+ print traceback.format_exc()
+
+
+class Worker(Process):
+ def __init__(self, work_queue):
+ super(Worker, self).__init__()
+ self.api_client = APIClient('http://%s:9200' % es_hosts[random.randint(0, len(es_hosts) - 1)].get('host'))
+ self.work_queue = work_queue
+ self.es = Elasticsearch(es_hosts)
+ self.sentence_list = loremipsum.get_sentences(1000)
+ self.re_first_word = re.compile('([A-z]+)')
+
+ def run(self):
+ print 'Starting %s ' % self.name
+ counter = 0
+
+ docs = {}
+
+ while True:
+ index_batch_size = args.get('batch_size')
+ task = self.work_queue.get(timeout=600)
+ counter += 1
+
+ document = self.generate_document(task['field_count'])
+ flattened_doc = self.process_document(document,
+ task['type'],
+ task['uuid'],
+ task['uuid'])
+
+ index_type_tuple = (task['index'], task['type'])
+
+ # self.handle_document(task['index'], task['type'], task['uuid'], flattened_doc)
+
+ doc_array = docs.get(index_type_tuple)
+
+ if doc_array is None:
+ doc_array = []
+ docs[index_type_tuple] = doc_array
+
+ doc_array.append(flattened_doc)
+
+ if len(doc_array) >= index_batch_size:
+ self.handle_batch(task['index'], task['type'], doc_array)
+ doc_array = []
+
+ self.work_queue.task_done()
+
+ def generate_document(self, fields):
+
+ doc = {}
+
+ my_bool = True
+
+ for i in xrange(fields):
+ sentence_index = random.randint(0, max((fields / 2) - 1, 1))
+ sentence = self.sentence_list[sentence_index]
+
+ if random.random() >= .5:
+ key = self.re_first_word.findall(sentence)[1]
+ else:
+ key = self.re_first_word.findall(sentence)[1] + str(i)
+
+ field_type = random.random()
+
+ if field_type <= 0.3:
+ doc[key] = sentence
+
+ elif field_type <= 0.5:
+ doc[key] = random.randint(1, 1000000)
+
+ elif field_type <= 0.6:
+ doc[key] = random.random() * 1000000000
+
+ elif field_type == 0.7:
+ doc[key] = my_bool
+ my_bool = not my_bool
+
+ elif field_type == 0.8:
+ doc[key] = self.generate_document(max(fields / 5, 1))
+
+ elif field_type <= 1.0:
+ doc['mylocation'] = self.generate_location()
+
+ return doc
+
+ @staticmethod
+ def get_fields(document, base_name=None):
+ fields = []
+
+ for name, value in document.iteritems():
+ if base_name:
+ field_name = '%s.%s' % (base_name, name)
+ else:
+ field_name = name
+
+ if isinstance(value, dict):
+ fields += Worker.get_fields(value, field_name)
+ else:
+ value_name = None
+ if isinstance(value, basestring):
+ value_name = 'string'
+
+ elif isinstance(value, bool):
+ value_name = 'boolean'
+
+ elif isinstance(value, (int, long)):
+ value_name = 'long'
+
+ elif isinstance(value, float):
+ value_name = 'double'
+
+ if value_name:
+ field = {
+ 'name': field_name,
+ value_name: value
+ }
+ else:
+ field = {
+ 'name': field_name,
+ 'string': str(value)
+ }
+
+ fields.append(field)
+
+ return fields
+
+ @staticmethod
+ def process_document(document, doc_type, application_id, uuid):
+ response = {
+ 'entityId': uuid,
+ 'entityVersion': '1',
+ 'entityType': doc_type,
+ 'applicationId': application_id,
+ 'fields': Worker.get_fields(document)
+ }
+
+ return response
+
+ def handle_document(self, index, doc_type, uuid, document):
+
+ res = self.es.create(index=index,
+ doc_type=doc_type,
+ id=uuid,
+ body=document)
+
+ print res
+
+ def generate_location(self):
+ response = {}
+
+ lat = random.random() * 90.0
+ lon = random.random() * 180.0
+
+ lat_neg_true = True if lon > .5 else False
+ lon_neg_true = True if lat > .5 else False
+
+ lat = lat * -1.0 if lat_neg_true else lat
+ lon = lon * -1.0 if lon_neg_true else lon
+
+ response['location'] = {
+ 'lat': lat,
+ 'lon': lon
+ }
+
+ return response
+
+ def handle_batch(self, index, doc_type, docs):
+ print 'HANDLE BATCH'
+ self.api_client.define_type_mapping(index, doc_type)
+ self.api_client.index_docs(index, docs, doc_type)
+
+
+def total_milliseconds(td):
+ return (td.microseconds + td.seconds * 1000000) / 1000
+
+
+def get_type_mapping(type_name):
+ return {
+ type_name: {
+ "_routing": {
+ "path": "entityId",
+ "required": True
+ },
+ "properties": {
+ "entityId": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "entityVersion": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "entityType": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "applicationId": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "nodeId": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "edgeName": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "entityNodeType": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "edgeTimestamp": {
+ "type": "long",
+ "doc_values": True
+ },
+ "edgeSearch": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "fields": {
+ "type": "nested",
+ "properties": {
+ "name": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ },
+ "boolean": {
+ "type": "boolean",
+ "doc_values": True
+ },
+ "long": {
+ "type": "long",
+ "doc_values": True
+ },
+ "double": {
+ "type": "double",
+ "doc_values": True
+ },
+ "location": {
+ "type": "geo_point",
+ "lat_lon": True,
+ "geohash": True,
+ "doc_values": True
+ },
+ "string": {
+ "type": "string",
+ "norms": {
+ "enabled": False
+ },
+ "fields": {
+ "exact": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ }
+ }
+ },
+ "uuid": {
+ "type": "string",
+ "index": "not_analyzed",
+ "doc_values": True
+ }
+ }
+ }
+ },
+ "_all": {
+ "enabled": False
+ }
+
+ }
+ }
+
+
+def main():
+ INDEX_COUNT = args.get('index_count')
+ TYPE_COUNT = args.get('type_count')
+ SETUP = args.get('setup')
+
+ indices = []
+ types = []
+ work_queue = JoinableQueue()
+
+ apiclient = APIClient('http://%s:9200' % es_hosts[random.randint(1, len(es_hosts) - 1)].get('host'))
+
+ workers = [Worker(work_queue) for x in xrange(args.get('workers'))]
+ [worker.start() for worker in workers]
+
+ try:
+ #
+ for x in xrange(TYPE_COUNT):
+ type_name = '%s_%s' % (args.get('type_prefix'), x)
+ types.append(type_name)
+
+ for x in xrange(INDEX_COUNT):
+ index_name = '%s_%s' % (args.get('index_prefix'), x)
+ indices.append(index_name)
+
+ if SETUP:
+ print 'Running setup...'
+
+ for index_name in indices:
+ apiclient.delete_index(index_name)
+
+ time.sleep(5)
+
+ for index_name in indices:
+ apiclient.create_index(
+ index_name,
+ shards=args['shard_count'],
+ replicas=args['replica_count'])
+
+ # time.sleep(5)
+
+ # for index_name in indices:
+ # for type_name in types:
+ # apiclient.define_type_mapping(index_name, type_name)
+
+ # time.sleep(5)
+
+ total_messages = args.get('document_count')
+ batch_size = 100000
+ message_counter = 0
+ fields = random.randint(50, 100)
+
+ while message_counter < total_messages:
+
+ for count in xrange(batch_size):
+
+ for index_name in indices:
+ doc_id = str(uuid.uuid1())
+
+ task = {
+ 'field_count': fields,
+ 'uuid': doc_id,
+ 'index': index_name,
+ 'type': types[random.randint(0, len(types) - 1)]
+ }
+
+ work_queue.put(task)
+
+ print 'Joining queue counter=[%s]...' % message_counter
+ work_queue.join()
+ print 'Done queue counter=[%s]...' % message_counter
+ message_counter += batch_size
+
+ except KeyboardInterrupt:
+ [worker.terminate() for worker in workers]
+
+
+main()
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/requirements.txt
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/requirements.txt b/utils/usergrid-util-python/requirements.txt
new file mode 100644
index 0000000..d15d7be
--- /dev/null
+++ b/utils/usergrid-util-python/requirements.txt
@@ -0,0 +1,4 @@
+urllib3
+usergrid
+requests
+redis
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/samples/activity_streams/activity_streams.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/samples/activity_streams/activity_streams.py b/utils/usergrid-util-python/samples/activity_streams/activity_streams.py
new file mode 100644
index 0000000..ce38544
--- /dev/null
+++ b/utils/usergrid-util-python/samples/activity_streams/activity_streams.py
@@ -0,0 +1,132 @@
+# docs page: http://docs.apigee.com/api-baas/content/creating-activity
+
+# create user 1
+# post event for user 1
+# check feed for user 1
+
+# create user 2
+# user 2 follows user 1
+# post event for user 1
+
+# check feed for user 1
+# check feed for user 2
+import json
+
+import requests
+
+collection_url_template = "{api_url}/{org}/{app}/{collection}"
+entity_url_template = "{api_url}/{org}/{app}/{collection}/{entity_id}"
+connection_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}"
+connection_create_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_uuid}"
+
+user_url_template = "{api_url}/{org}/{app}/users/{username}"
+user_feed_url_template = "{api_url}/{org}/{app}/users/{username}/feed"
+user_activity_url_template = "{api_url}/{org}/{app}/users/{username}/activities"
+user_follows_url_template = "{api_url}/{org}/{app}/users/{user2}/following/users/{user1}"
+
+url_data = {
+ 'api_url': 'https://amer-apibaas-prod.apigee.net/appservices',
+ 'org': 'jwest-samples',
+ 'app': 'feed-example'
+}
+
+session = requests.Session()
+
+
+def create_user(user):
+ data = {
+ 'username': user,
+ 'email': '%s@example.com' % user
+ }
+
+ url = collection_url_template.format(collection='users', **url_data)
+
+ r = session.post(url, json.dumps(data))
+
+ if r.status_code != 200:
+ print 'Error creating user [%s] at URL=[%s]: %s' % (user, url, r.text)
+
+
+def post_activity(user, text):
+ activity = {
+ "actor": {
+ "displayName": user,
+ "username": user,
+ "image": {
+ "duration": 0,
+ "height": 80,
+ "url": "http://www.gravatar.com/avatar/", "width": 80},
+ "email": "%s@example.com" % user
+ },
+ "verb": "post",
+ "content": text
+ }
+
+ url = user_activity_url_template.format(username=user, **url_data)
+
+ r = session.post(url, json.dumps(activity))
+
+ if r.status_code != 200:
+ print 'Error creating activity for user [%s] at URL=[%s]: %s' % (user, url, r.text)
+
+
+def get_feed(user):
+ url = user_feed_url_template.format(username=user, **url_data)
+
+ r = session.get(url)
+
+ if r.status_code != 200:
+ print 'Error getting feed for user [%s] at URL=[%s]: %s' % (user, url, r.text)
+
+ else:
+ print '----- START'
+ print json.dumps(r.json(), indent=2)
+ print '----- END'
+
+
+def create_follows(user, user_to_follow):
+ url = user_follows_url_template.format(user1=user, user2=user_to_follow, **url_data)
+
+ r = session.post(url)
+
+ print r.text
+
+ if r.status_code != 200:
+ print 'Error getting creating follows from user [%s] to user [%s] at URL=[%s]: %s' % (
+ user, user_to_follow, url, r.text)
+
+
+def delete_user(username):
+ url = user_url_template.format(username=username, **url_data)
+
+ r = session.post(url)
+
+ # print r.text
+
+ if r.status_code != 200:
+ print 'Error deleting user [%s] at URL=[%s]: %s' % (username, url, r.text)
+
+
+user_base = 'natgeo'
+
+user1 = '%s_%s' % (user_base, 1)
+user2 = '%s_%s' % (user_base, 2)
+
+create_user(user1)
+post_activity(user1, 'Hello World!')
+
+get_feed(user1)
+
+create_user(user2)
+create_follows(user2, user1)
+post_activity(user2, "I'm here!")
+get_feed(user2)
+
+post_activity(user1, 'SEE YA!!')
+
+get_feed(user2)
+
+get_feed(user1)
+
+delete_user(user1)
+delete_user(user2)
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/samples/beacon-event-example.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/samples/beacon-event-example.py b/utils/usergrid-util-python/samples/beacon-event-example.py
new file mode 100644
index 0000000..fc05cdc
--- /dev/null
+++ b/utils/usergrid-util-python/samples/beacon-event-example.py
@@ -0,0 +1,196 @@
+# URL Templates for Usergrid
+import json
+import random
+
+import requests
+from multiprocessing import Process, Pool
+
+import time
+
+collection_url_template = "{api_url}/{org}/{app}/{collection}"
+entity_url_template = "{api_url}/{org}/{app}/{collection}/{entity_id}"
+connection_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}"
+connection_create_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_uuid}"
+
+url_data = {
+ 'api_url': 'https://amer-apibaas-prod.apigee.net/appservices',
+ 'org': 'jwest-samples',
+ 'app': 'event-example'
+}
+
+url_data = {
+ 'api_url': 'http://usergrid_app.cfapps-01.haas-26.pez.pivotal.io',
+ 'org': 'jwest',
+ 'app': 'sandbox'
+}
+
+session = requests.Session()
+
+
+class EventGenerator(Process):
+ def __init__(self, store_id, event_count, user_array, beacons):
+ super(EventGenerator, self).__init__()
+
+ self.store_id = store_id
+ self.user_array = user_array
+ self.event_count = event_count
+ self.beacons = beacons
+ self.session = requests.Session()
+ self.create_store(self.store_id)
+ self.create_users(self.user_array)
+
+ def create_store(self, store_id):
+ url = entity_url_template.format(collection='stores', entity_id=store_id, **url_data)
+
+ r = self.session.put(url, data=json.dumps({"name": store_id}))
+
+ if r.status_code != 200:
+ print 'Error creating store [%s] at URL=[%s]: %s' % (store_id, url, r.text)
+
+ def create_event(self, user, event):
+ print 'creating event: %s' % json.dumps(event)
+
+ url = collection_url_template.format(collection='general-events', **url_data)
+
+ r = self.session.post(url, data=json.dumps(event))
+
+ if r.status_code == 200:
+ res = r.json()
+ entity = res.get('entities')[0]
+ event_uuid = entity.get('uuid')
+
+ # link to user
+ create_connection_url = connection_create_url_template.format(collection='users',
+ uuid=user,
+ verb='events',
+ target_uuid=event_uuid,
+ **url_data)
+
+ r_connect = self.session.post(create_connection_url)
+
+ if r_connect.status_code == 200:
+ print 'created connection: %s' % create_connection_url
+
+ # link to store
+ create_connection_url = connection_create_url_template.format(collection='stores',
+ uuid=event.get('storeId'),
+ verb='events',
+ target_uuid=event_uuid,
+ **url_data)
+
+ r_connect = self.session.post(create_connection_url)
+
+ if r_connect.status_code == 200:
+ print 'created connection: %s' % create_connection_url
+
+ if event.get('eventType') == 'beacon':
+ # link to beacon
+ create_connection_url = connection_create_url_template.format(collection='beacons',
+ uuid=event.get('beaconId'),
+ verb='events',
+ target_uuid=event_uuid,
+ **url_data)
+
+ r_connect = self.session.post(create_connection_url)
+
+ if r_connect.status_code == 200:
+ print 'created connection: %s' % create_connection_url
+ else:
+ print 'Error creating connection at URL=[%s]: %s' % (create_connection_url, r.text)
+
+ def run(self):
+
+ for user in self.user_array:
+
+ # store 123
+ self.create_event(user, {
+ 'storeId': self.store_id,
+ 'eventType': 'enterStore'
+ })
+
+ for x in xrange(0, self.event_count):
+ beacon_number = random.randint(0, len(self.beacons) - 1)
+ beacon_name = self.beacons[beacon_number]
+
+ event = {
+ 'beaconId': '%s-%s' % (self.store_id, beacon_name),
+ 'storeId': self.store_id,
+ 'eventType': 'beacon'
+ }
+
+ self.create_event(user, event)
+
+ self.create_event(user, {
+ 'storeId': self.store_id,
+ 'eventType': 'exitStore'
+ })
+
+ def create_users(self, user_array):
+ for user in user_array:
+ self.create_user(user)
+
+ def create_user(self, user):
+ data = {
+ 'username': user,
+ 'email': '%s@example.com' % user
+ }
+
+ url = collection_url_template.format(collection='users', **url_data)
+
+ r = self.session.post(url, json.dumps(data))
+
+ if r.status_code != 200:
+ print 'Error creating user [%s] at URL=[%s]: %s' % (user, url, r.text)
+
+
+def create_entity(entity_type, entity_name):
+ url = entity_url_template.format(collection=entity_type, entity_id=entity_name, **url_data)
+ r = session.put(url, data=json.dumps({'name': entity_name}))
+
+ if r.status_code != 200:
+ print 'Error creating %s [%s] at URL=[%s]: %s' % (entity_type, entity_name, url, r.text)
+
+
+def create_beacon(beacon_name):
+ create_entity('beacons', beacon_name)
+
+
+def create_store(store_name):
+ create_entity('stores', store_name)
+
+
+def main():
+ beacons = ["b1", "b2", "b3", "b4", "b5", "b6"]
+
+ stores = ['store_123', 'store_456', 'store_789', 'store_901']
+
+ beacon_names = []
+
+ for store in stores:
+ for beacon in beacons:
+ beacon_names.append('%s-%s' % (store, beacon))
+
+ pool = Pool(16)
+
+ pool.map(create_beacon, beacon_names)
+ pool.map(create_store, stores)
+
+ processes = [
+ EventGenerator(stores[0], 100, ['jeff', 'julie'], beacons=beacons),
+ EventGenerator(stores[0], 100, ['russo', 'dunker'], beacons=beacons),
+ EventGenerator(stores[2], 100, ['jeff', 'julie'], beacons=beacons),
+ EventGenerator(stores[2], 100, ['russo', 'dunker'], beacons=beacons),
+ EventGenerator(stores[3], 100, ['jeff', 'julie'], beacons=beacons),
+ EventGenerator(stores[3], 100, ['russo', 'dunker'], beacons=beacons),
+ EventGenerator(stores[1], 100, ['bala', 'shankar'], beacons=beacons),
+ EventGenerator(stores[1], 100, ['chet', 'anant'], beacons=beacons)
+ ]
+
+ [p.start() for p in processes]
+
+ while len([p for p in processes if p.is_alive()]) > 0:
+ print 'Processors active, waiting'
+ time.sleep(1)
+
+
+main()
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/samples/counter_test.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/samples/counter_test.py b/utils/usergrid-util-python/samples/counter_test.py
new file mode 100644
index 0000000..7852b26
--- /dev/null
+++ b/utils/usergrid-util-python/samples/counter_test.py
@@ -0,0 +1,31 @@
+import datetime
+import time
+import json
+
+import requests
+
+tstamp = time.gmtime() * 1000
+
+s = requests.Session()
+
+s.headers.update({'authorization': 'Bearer YWMt7AHANAKcEeaVR-EahuX8EgAAAVQ7Q56jxQjUsmhJn8rGLTth0XtRrBSIzDA'})
+s.headers.update({'content-type': 'application/json'})
+
+url = 'https://host/appservices-new/usergrid/pushtest/events'
+
+body = {
+ "timestamp": tstamp,
+ "counters": {
+ "counters.jeff.west": 1
+ }
+}
+
+r = s.post(url, data=json.dumps(body))
+
+print r.status_code
+
+time.sleep(30)
+
+r = s.get('https://host/appservices-new/usergrid/pushtest//counters?counter=counters.jeff.west')
+
+print r.text
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/setup.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/setup.py b/utils/usergrid-util-python/setup.py
new file mode 100755
index 0000000..1f19cb2
--- /dev/null
+++ b/utils/usergrid-util-python/setup.py
@@ -0,0 +1,40 @@
+from setuptools import setup, find_packages
+
+__author__ = 'Jeff West @ ApigeeCorporation'
+
+VERSION = '0.5.13'
+
+setup(
+ name='usergrid-tools',
+ version=VERSION,
+ description='Tools for working with Apache Usergrid',
+ url='http://usergrid.apache.org',
+ download_url="https://codeload.github.com/jwest-apigee/usergrid-util-python/zip/%s" % VERSION,
+ author='Jeff West',
+ author_email='jwest@apigee.com',
+
+ # packages=['usergrid_tools', 'es_tools'],
+ packages=find_packages(exclude=["*.tests", "*.tests.*", "tests.*", "tests", "sandbox"]),
+
+ install_requires=[
+ 'requests',
+ 'usergrid>=0.1.3',
+ 'time_uuid',
+ 'argparse',
+ 'redis',
+ 'ConcurrentLogHandler',
+ ],
+
+ entry_points={
+ 'console_scripts': [
+ 'usergrid_iterator = usergrid_tools.iterators.simple_iterator:main',
+ 'usergrid_data_migrator = usergrid_tools.migration.usergrid_data_migrator:main',
+ 'usergrid_data_exporter = usergrid_tools.migration.usergrid_data_exporter:main',
+ 'usergrid_entity_index_test = usergrid_tools.indexing.entity_index_test:main',
+ 'usergrid_batch_index_test = usergrid_tools.indexing.batch_index_test:main',
+ 'usergrid_parse_importer = usergrid_tools.parse_importer.parse_importer:main',
+ 'usergrid_deleter = usergrid_tools.parse_importer.parse_importer:main',
+ 'usergrid_library_check = usergrid_tools.library_check:main',
+ ]
+ }
+)
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/usergrid_tools/__init__.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/usergrid_tools/__init__.py b/utils/usergrid-util-python/usergrid_tools/__init__.py
new file mode 100644
index 0000000..beed654
--- /dev/null
+++ b/utils/usergrid-util-python/usergrid_tools/__init__.py
@@ -0,0 +1,4 @@
+import migration
+import iterators
+import indexing
+import general
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/usergrid_tools/general/__init__.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/usergrid_tools/general/__init__.py b/utils/usergrid-util-python/usergrid_tools/general/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/usergrid_tools/general/deleter.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/usergrid_tools/general/deleter.py b/utils/usergrid-util-python/usergrid_tools/general/deleter.py
new file mode 100644
index 0000000..3c53cae
--- /dev/null
+++ b/utils/usergrid-util-python/usergrid_tools/general/deleter.py
@@ -0,0 +1,151 @@
+import json
+import traceback
+import requests
+
+__author__ = 'Jeff West @ ApigeeCorporation'
+
+
+def total_milliseconds(td):
+ return (td.microseconds + td.seconds * 1000000) / 1000
+
+
+# for Apigee Developer, leave this as is. For paid BaaS instances change this to https://{your_api_url}/[appservices]
+api_url = 'https://api.usergrid.com'
+
+# specify the org[] / app[] / collection[] to delete
+# Org and App level are required. If no collections are specified, all collections will be deleted
+# you also need to specify the client_id and secret of each org
+
+data_map = {
+ "orgs":
+ {
+ "myOrg": {
+ "apps": {
+ "myApp": {
+ "collections": [
+ 'examples'
+ ]
+ }
+ },
+ "credentials": {
+ "client_id": "foo",
+ "client_secret": "bar"
+ }
+ }
+ }
+}
+# it is generally not a good idea to delete more than 100 at a time due to latency and resource utilization
+url_template = '{api_url}/{org}/{app}/{collection}?limit=250'
+
+session = requests.Session()
+
+
+def check_response_status(response, message='', exit_on_error=True):
+ if response.status_code != 200:
+ print 'ERROR: ' + message
+ print response.text
+
+ if exit_on_error:
+ exit()
+
+
+def delete_all_collections(org, app, token):
+ url = '{api_url}/{org}/{app}'.format(api_url=api_url, org=org, app=app)
+
+ print 'Listing collections at URL: %s' % url
+
+ r = session.get(url)
+
+ if r.status_code != 200:
+ print r.text
+
+ collections = []
+
+ delete_collections(org, app, collections, token)
+
+
+def delete_collections(org, app, collections, token):
+ print 'Deleting [%s] collections: %s' % (len(collections), collections)
+
+ for collection in collections:
+ print 'Deleting collection [%s]...' % collection
+
+ keep_going = True
+
+ count_with_zero = 0
+
+ while keep_going:
+
+ url = url_template.format(api_url=api_url, org=org, app=app, collection=collection)
+
+ try:
+ response = session.get(url)
+ check_response_status(response, message='Unable to GET URL: %s' % url)
+
+ count = len(response.json().get('entities'))
+ total_ms = total_milliseconds(response.elapsed)
+
+ print 'GET %s from collection %s in %s' % (count, collection, total_ms)
+ print 'Deleting...'
+
+ response = session.delete(url)
+
+ check_response_status(response, message='UNABLE TO DELETE on URL: %s' % url)
+
+ try:
+ count = len(response.json().get('entities'))
+ total_ms = total_milliseconds(response.elapsed)
+
+ print 'Deleted %s from collection %s in %s' % (count, collection, total_ms)
+
+ if count == 0:
+ count_with_zero += 1
+ print 'Count with ZERO: %s' % count_with_zero
+
+ # if there are 10 in a row with zero entities returned, we're done
+ if count_with_zero >= 10:
+ keep_going = False
+ else:
+ count_with_zero = 0
+ except:
+ print 'Error! HTTP Status: %s response: %s' % (response.status_code, response.text)
+
+ except KeyboardInterrupt:
+ exit()
+
+ except:
+ print traceback.format_exc()
+
+
+# iterate the orgs specified in the configuration above
+for org, org_data in data_map.get('orgs', {}).iteritems():
+
+ credentials = org_data.get('credentials', {})
+
+ token_request = {
+ 'grant_type': 'client_credentials',
+ 'client_id': credentials.get('client_id'),
+ 'client_secret': credentials.get('client_secret'),
+ }
+
+ token_url = '{api_url}/management/token'.format(api_url=api_url)
+
+ r = session.post(token_url, data=json.dumps(token_request))
+
+ check_response_status(r, message='Unable to get Token at URL %s' % token_url)
+
+ token = r.json().get('access_token')
+ session.headers.update({'Authorization': 'Bearer ' + token})
+
+ # iterate the apps specified in the config above
+ for app, app_data in org_data.get('apps', {}).iteritems():
+
+ collections = app_data.get('collections', [])
+
+ # if the list of collections is empty, delete all collections
+ if len(collections) == 0:
+ delete_all_collections(org, app, token)
+
+ # Otherwise, delete the specified collections
+ else:
+ delete_collections(org, app, collections, token)
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/usergrid_tools/general/duplicate_name_checker.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/usergrid_tools/general/duplicate_name_checker.py b/utils/usergrid-util-python/usergrid_tools/general/duplicate_name_checker.py
new file mode 100644
index 0000000..3682d18
--- /dev/null
+++ b/utils/usergrid-util-python/usergrid_tools/general/duplicate_name_checker.py
@@ -0,0 +1,25 @@
+from usergrid import UsergridQueryIterator
+
+### This iterates a collection using GRAPH and checks whether there are more than on entity with the same name
+
+url = 'https://host/org/app/collection?access_token=foo&limit=1000'
+
+q = UsergridQueryIterator(url)
+
+name_tracker = {}
+counter = 0
+for e in q:
+ counter += 1
+
+ if counter % 1000 == 1:
+ print 'Count: %s' % counter
+
+ name = e.get('name')
+
+ if name in name_tracker:
+ name_tracker[name].append(e.get('uuid'))
+
+ print 'duplicates for name=[%s]: %s' % (name, name_tracker[name])
+
+ else:
+ name_tracker[name] = [e.get('uuid')]